# 七、大数据和 Twitter 情感分析 “数据是新的石油。” ————未知 在本章中,我们将研究 AI 和数据科学的两个重要领域:**自然语言处理**(**NLP**)和大数据分析。 对于支持的示例应用,我们重新实现了 Twitter 主题标签项目的情感分析,该项目在第 1 章“开发人员对数据科学的观点”中进行了介绍, 我们利用 Jupyter 笔记本和 PixieDust 构建实时的仪表板,以分析从相关的推文流到特定实体(例如公司提供的产品)的数据,以提供情感信息,以及有关从相同推文中提取的趋势实体的其他信息。 在本章的最后,读者将学习如何将基于云的 NLP 服务(例如,IBM Watson 自然语言理解)集成到其应用中,以及如何在(Twitter)规模上使用诸如 Apache Spark。 与往常一样,我们将展示如何通过将实时仪表板实现为直接在 Jupyter 笔记本中运行的 PixieApp 来实现分析的操作。 # Apache Spark 入门 *大数据*一词可能会感到模糊不清。 考虑任何数据集大数据的截止点是什么? 是 10 GB,100 GB,1 TB 还是更多? 我喜欢的一个定义是:大数据是指数据无法容纳在单台计算机上可用的内存中。 多年以来,数据科学家一直被迫对大型数据集进行采样,因此它们可以放入一台机器中,但是随着并行计算框架能够将数据分布到一组机器中,并行处理框架开始发生变化,这种情况就开始发生变化。 整个数据集,当然前提是集群具有足够的计算机。 同时,随着云技术的进步,可以按需提供适应数据集大小的机器集群。 如今,有多个框架(大多数时候可以作为开源使用),可以提供强大,灵活的并行计算功能。 最受欢迎的包括 [Apache Hadoop](http://hadoop.apache.org),[Apache Spark](https://spark.apache.org) 和 [Dask](https://dask.pydata.org)。 对于我们的 *Twitter 情感分析*应用,我们将使用 Apache Spark,它在可伸缩性,可编程性和速度方面提供出色的性能。 此外,许多云提供商提供了一些 Spark 即服务的功能,使您能够在几分钟内按需创建大小合适的 Spark 集群。 一些 Spark 即服务的云提供商包括: * [Microsoft Azure](https://azure.microsoft.com/en-us/services/hdinsight/apache-spark) * [亚马逊网络服务](https://aws.amazon.com/emr/details/spark) * [Google 云端](https://cloud.google.com/dataproc) * [Databricks](https://databricks.com) * [IBM Cloud](https://www.ibm.com/cloud/analytics-engine) ### 注意 **注意**:Apache Spark 也可以很容易地安装在本地计算机上以进行测试,在这种情况下,群集节点是使用线程模拟的。 ## Apache Spark 架构 以下图显示了 Apache Spark 框架的主要组件: ![Apache Spark architecture](img/00145.jpeg) Spark 高级架构 * **Spark SQL**:此组件的核心数据结构是 Spark `DataFrame`,它使了解 SQL 语言的用户可以轻松处理结构化数据。 * **Spark Streaming**:模块用于处理流数据。 稍后我们将看到,我们将在示例应用中使用此模块,尤其是使用结构化流(在 Spark 2.0 中引入)。 * **MLlib**:模块,提供了功能丰富的机器学习库,可在 Spark 规模上工作。 * **GraphX**:模块用于执行图并行计算。 主要有两种使用 Spark 集群的方法,如下图所示: ![Apache Spark architecture](img/00146.jpeg) 使用 Spark 集群的两种方法 * **spark-submit**:用于在集群上启动 Spark 应用的 Shell 脚本 * **笔记本**:以交互方式针对 Spark 集群执行代码语句 关于`spark-submit` shell 脚本的内容不在本书的讨论范围内,[但是可以在以下位置找到官方文档](https://spark.apache.org/docs/latest/submitting-applications.html)。 在本章的其余部分,我们将重点介绍通过 Jupyter 笔记本与 Spark 集群进行交互。 ## 配置笔记本来配合 Spark 本节中的说明仅涉及在本地安装 Spark 以进行开发和测试。 在群集中手动安装 Spark 超出了本书的范围。 如果需要一个真正的集群,强烈建议使用基于云的服务。 默认情况下,本地 Jupyter 笔记本安装了纯 Python 内核。 要使用 Spark,用户必须使用以下步骤: 1. 通过从[这个页面](https://spark.apache.org/downloads.html)下载二进制发行版在本地安装 Spark。 2. 使用以下命令在临时目录中生成内核规范: ```py ipython kernel install --prefix /tmp ``` ### 注意 **注意**:只要声明以下消息,上述命令可能会生成警告消息,可以安全地忽略该警告消息: `Installed kernelspec python3 in /tmp/share/jupyter/kernels/python3` 3. 转到`/tmp/share/jupyter/kernels/python3`,然后编辑`kernel.json`文件,将以下键添加到 JSON 对象(将`<>`替换为安装 Spark 的目录路径,将`<>`替换为系统上安装的版本): ```py "env": { "PYTHONPATH": "<>/python/:<>/python/lib/py4j-<>-src.zip", "SPARK_HOME": "<>", "PYSPARK_SUBMIT_ARGS": "--master local[10] pyspark-shell", "SPARK_DRIVER_MEMORY": "10G", "SPARK_LOCAL_IP": "127.0.0.1", "PYTHONSTARTUP": "<>/python/pyspark/shell.py" } ``` 4. 您可能还需要自定义`display_name`键,使其具有唯一性,并且可以从 Juptyer UI 轻松识别。 如果需要了解现有内核的列表,可以使用以下命令: ```py jupyter kernelspec list ``` 前面的命令将为您提供内核名称列表以及本地文件系统上的关联路径。 从路径中,您可以打开`kernel.json`文件来访问`display_name`值。 例如: ```py Available kernels: pixiedustspark16 /Users/dtaieb/Library/Jupyter/kernels/pixiedustspark16 pixiedustspark21 /Users/dtaieb/Library/Jupyter/kernels/pixiedustspark21 pixiedustspark22 /Users/dtaieb/Library/Jupyter/kernels/pixiedustspark22 pixiedustspark23 /Users/dtaieb/Library/Jupyter/kernels/pixiedustspark23 ``` 5. 使用以下命令将内核与已编辑的文件一起安装: ```py jupyter kernelspec install /tmp/share/jupyter/kernels/python3 ``` ### 注意 注意:根据环境,运行前面的命令时,您可能会收到“权限被拒绝”错误。 在这种情况下,您可能希望使用`sudo`以管理员权限运行命令或使用`--user`开关,如下所示: `jupyter kernelspec install --user /tmp/share/jupyter/kernels/python3` 有关安装选项的更多信息,可以使用`-h`开关。 例如: ```py jupyter kernelspec install -h ``` 6. 重新启动笔记本服务器并开始使用新的 PySpark 内核。 幸运的是,PixieDust 提供了一个`install`脚本来自动执行上述手动步骤。 ### 注意 [您可以在此处找到有关此脚本的详细文档](https://pixiedust.github.io/pixiedust/install.html)。 简而言之,使用自动 PixieDust `install`脚本需要发出以下命令并遵循屏幕上的说明: ```py jupyter pixiedust install ``` 我们将在本章稍后深入研究 Spark 编程模型,但是现在,让我们在下一部分中定义 *Twitter 情感分析*应用的 MVP 要求。 # Twitter 情感分析应用 与往常一样,我们首先定义 MVP 版本的要求: * 连接到 Twitter 以获取由用户提供的查询字符串过滤的实时推文流 * 丰富推文以添加情感信息和从文本中提取的相关实体 * 使用实时图表显示仪表板,其中包含有关数据的各种统计信息,并按指定的时间间隔进行更新 * 系统应该能够扩展到 Twitter 数据大小 以下图显示了我们的应用架构的第一个版本: ![Twitter sentiment analysis application](img/00147.jpeg) Twitter 情感架构版本 1 对于版本 1,该应用将完全在单个 Python 笔记本中实现,并将调出 NLP 部分的外部服务。 为了进行扩展,我们当然必须将笔记本外部的某些处理外部化,但是为了进行开发和测试,我发现能够在单个笔记本中包含整个应用可以显着提高生产力。 至于库和框架,我们将使用 [Tweepy](http://www.tweepy.org) 将连接到 Twitter,[Apache Spark 结构化流](https://spark.apache.org/streaming),用于处理分布式集群和 [Watson Developer Cloud Python SDK](https://github.com/watson-developer-cloud/python-sdk) 来访问 [IBM Watson Natural 语言理解](https://www.ibm.com/watson/services/natural-language-understanding)服务。 # 第 1 部分——使用 Spark 结构化流获取数据 为了获取数据,我们使用 Tweepy,它提供了一个优雅的 Python 客户端库来访问 Twitter API。 Tweepy 涵盖的 API 非常广泛,详细介绍超出了本书的范围,[但是您可以在 Tweepy 官方网站上找到完整的 API 参考](http://tweepy.readthedocs.io/en/v3.6.0/cursor_tutorial.html)。 您可以使用`pip install`命令直接从 PyPi 安装 Tweepy 库。 以下命令显示如何使用`!`指令从笔记本计算机安装它: ```py !pip install tweepy ``` ### 注意 **注意**:当前使用的 Tweepy 版本是 3.6.0。 安装库后,不要忘记重新启动内核。 ## 数据管道的架构图 在我们开始深入研究数据管道的每个组件之前,最好先了解其总体架构并了解计算流程。 如下图所示,我们首先创建一个 Tweepy 流,该流将原始数据写入 CSV 文件。 然后,我们创建一个 Spark Streaming `DataFrame`,该框架读取 CSV 文件,并定期使用新数据进行更新。 从 Spark Streaming`DataFrame`中,我们使用 SQL 创建一个 Spark 结构化查询并将其结果存储在 Parquet 数据库中: ![Architecture diagram for the data pipeline](img/00148.jpeg) 流计算流程 ## 通过 Twitter 执行认证 在使用任何 Twitter API 之前,建议先通过系统进行认证。 [OAuth 2.0 协议](https://oauth.net)是最常用的认证机制之一,它使第三方应用能够访问网络上的服务。 您需要做的第一件事是获取 OAuth 协议用来验证您身份的一组密钥字符串: * **用户密钥**:唯一标识客户端应用的字符串(也称为 API 密钥)。 * **使用者密码**:仅应用和 Twitter OAuth 服务器知道的密码字符串。 可以认为它就像一个密码。 * **访问令牌**:使用字符串来验证您的请求。 在授权阶段还可以使用此令牌来确定应用的访问级别。 * **访问令牌密钥**:类似于用户密钥,这是与访问令牌一起发送的秘密字符串,用作密码。 要生成上述密钥字符串,您需要转到[这个页面](http://apps.twitter.com),使用常规的 Twitter 用户 ID 和密码进行认证,然后按照以下步骤操作: 1. 使用**创建新应用**按钮创建一个新的 Twitter 应用。 2. 填写应用详细信息,同意开发者协议,然后单击**创建您的 Twitter 应用**按钮。 ### 提示 **注意**:请确保您的手机号码已添加到个人资料中,否则在创建 Twitter 应用时会出现错误。 您可以为 **必填网站** 的强制输入提供随机 URL,并将 **URL** 的输入保留为空白,因为这是可选的回调 URL。 3. 单击**密钥和访问令牌**选项卡以获取使用者和访问令牌。 您可以随时使用此页面上的按钮重新生成这些令牌。 如果这样做,则还需要更新您的应用代码中的值。 为了更轻松地维护代码,让我们将这些标记放在笔记本顶部的自己的变量中,并创建`tweepy.OAuthHandler`类,稍后我们将使用它: ```py from tweepy import OAuthHandler # Go to http://apps.twitter.com and create an app. # The consumer key and secret will be generated for you after consumer_key="XXXX" consumer_secret="XXXX" # After the step above, you will be redirected to your app's page. # Create an access token under the "Your access token" section access_token="XXXX" access_token_secret="XXXX" auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) ``` ## 创建 Twitter 流 为了实现我们的应用,[我们只需要使用此处记录的 Twitter 流 API](http://tweepy.readthedocs.io/en/v3.5.0/streaming_how_to.html)。 在此步骤中,我们创建一个 Twitter 流,将输入的数据存储到本地文件系统上的 CSV 文件中。 这是使用从`tweepy.streaming.StreamListener`继承的自定义`RawTweetsListener`类完成的。 通过覆盖`on_data`方法,可以完成对传入数据的自定义处理。 在我们的例子中,我们想使用来自标准 Python `csv`模块的`DictWriter`将传入的数据从 JSON 转换为 CSV。 由于 Spark Streaming 文件输入源仅在输入目录中创建新文件时才触发,因此我们不能简单地将数据追加到现有文件中。 取而代之的是,我们将数据缓冲到一个数组中,并在缓冲区达到容量后将其写入磁盘。 ### 注意 为简单起见,该实现不包括在处理完文件后清理文件。 此实现的另一个次要限制是,我们当前要等到缓冲区被填满后才能写入文件,如果没有新的推文出现,理论上可能会花费很长时间。 `RawTweetsListener`的代码如下所示: ```py from six import iteritems import json import csv from tweepy.streaming import StreamListener class RawTweetsListener(StreamListener): def __init__(self): self.buffered_data = [] self.counter = 0 def flush_buffer_if_needed(self): "Check the buffer capacity and write to a new file if needed" length = len(self.buffered_data) if length > 0 and length % 10 == 0: with open(os.path.join( output_dir, "tweets{}.csv".format(self.counter)), "w") as fs: self.counter += 1 csv_writer = csv.DictWriter( fs, fieldnames = fieldnames) for data in self.buffered_data: csv_writer.writerow(data) self.buffered_data = [] def on_data(self, data): def transform(key, value): return transforms[key](value) if key in transforms else value self.buffered_data.append( {key:transform(key,value) \ for key,value in iteritems(json.loads(data)) \ if key in fieldnames} ) self.flush_buffer_if_needed() return True def on_error(self, status): print("An error occured while receiving streaming data: {}".format(status)) return False ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode1.py)。 从前面的代码中需要注意的重要事项是: * 来自 Twitter API 的每条推文都包含大量数据,我们选择继续使用`field_metadata`变量的字段。 我们还定义了一个全局变量`fieldnames`,该全局变量保存要从流中捕获的字段列表,而一个`transforms`变量则包含一个字典,该字典的所有字段名称均具有转换函数作为键,而转换函数本身作为值: ```py from pyspark.sql.types import StringType, DateType from bs4 import BeautifulSoup as BS fieldnames = [f["name"] for f in field_metadata] transforms = { item['name']:item['transform'] for item in field_metadata if "transform" in item } field_metadata = [ {"name": "created_at","type": DateType()}, {"name": "text", "type": StringType()}, {"name": "source", "type": StringType(), "transform": lambda s: BS(s, "html.parser").text.strip() } ] ``` ### 注意 [您可以在此处找到的文件文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode2.py)。 * CSV 文件写在`output_dir`中,该文件在其自己的变量中定义。 在开始时,我们首先删除目录及其内容: ```py import shutil def ensure_dir(dir, delete_tree = False): if not os.path.exists(dir): os.makedirs(dir) elif delete_tree: shutil.rmtree(dir) os.makedirs(dir) return os.path.abspath(dir) root_dir = ensure_dir("output", delete_tree = True) output_dir = ensure_dir(os.path.join(root_dir, "raw")) ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode3.py)。 * `field_metadata`包含 Spark 数据类型,我们稍后将在创建 Spark 流查询时使用它来构建架构。 * `field_metadata`还包含一个可选的变换`lambda`函数,用于在将值写入磁盘之前清除该值。 作为参考,Python 中的 Lambda 函数是内联定义的匿名函数(请参见[这个页面](https://docs.python.org/3/tutorial/controlflow.html#lambda-expressions))。 我们将其用于通常作为 HTML 片段返回的源字段。 在此 Lambda 函数中,我们使用 BeautifulSoup 库(在上一章中也使用过)仅提取文本,如以下代码片段所示: ```py lambda s: BS(s, "html.parser").text.strip() ``` 现在`RawTweetsListener`已创建,我们定义了`start_stream`函数,稍后将在 PixieApp 中使用。 此函数将搜索词数组作为输入,并使用`filter`方法开始新的流: ```py from tweepy import Stream def start_stream(queries): "Asynchronously start a new Twitter stream" stream = Stream(auth, RawTweetsListener()) stream.filter(track=queries, async=True) return stream ``` ### 注意 注意`async=True`参数传递给`stream.filter`。 需要确保该函数不会被阻止,这将阻止我们在笔记本中运行任何其他代码。 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode4.py)。 以下代码启动流,该流将接收其中包含单词`baseball`的推文: ```py stream = start_stream(["baseball"]) ``` 当运行前面的代码时,笔记本计算机中不生成任何输出。 但是,您可以从运行笔记本的路径中看到在输出目录(即`../output/raw`)中生成的文件(即`tweets0.csv`,`tweets1.csv`等)。 要停止流,我们只需调用`disconnect`方法,如下所示: ```py stream.disconnect() ``` ## 创建 Spark 流数据框架 参考架构图,下一步是创建一个将`output_dir`用作源文件输入的 Spark Streaming `DataFrame tweets_sdf`。 我们可以将 Streaming`DataFrame`视为一个无界表,其中随着新数据从流到达而不断添加新行。 ### 注意 **注**:Spark 结构化流支持多种类型的输入源,包括文件,Kafka,套接字和速率。 (Socket 和 Rate 均仅用于测试。) 下图取自 Spark 网站,并很好地解释了如何将新数据附加到 Streaming `DataFrame`: ![Creating a Spark Streaming DataFrame](img/00149.jpeg) 流式`DataFrame`流 [来源](https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png) Spark Streaming Python API 提供了一种优雅的方法,可以使用`spark.readStream`属性创建 Streaming `DataFrame`,该属性创建一个新的`pyspark.sql.streamingreamReader`对象,该对象方便地使您链接方法调用,并具有创建更清晰代码的额外好处(请参见[这个页面](https://en.wikipedia.org/wiki/Method_chaining),以获取有关此模式的更多详细信息)。 例如,要创建 CSV 文件流,我们使用`csv`调用`format`方法,链接适用的选项,然后使用目录路径调用`load`方法: ```py schema = StructType( [StructField(f["name"], f["type"], True) for f in field_metadata] ) csv_sdf = spark.readStream\ .format("csv")\ .option("schema", schema)\ .option("multiline", True)\ .option("dateFormat", 'EEE MMM dd kk:mm:ss Z y')\ .option("ignoreTrailingWhiteSpace", True)\ .option("ignoreLeadingWhiteSpace", True)\ .load(output_dir) ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode5.py)。 `spark.readStream`还提供了一种方便的高级`csv`方法,该方法将路径作为选项的第一个参数和关键字参数: ```py csv_sdf = spark.readStream \ .csv( output_dir, schema=schema, multiLine = True, dateFormat = 'EEE MMM dd kk:mm:ss Z y', ignoreTrailingWhiteSpace = True, ignoreLeadingWhiteSpace = True ) ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode6.py)。 您可以通过调用应返回`true`的`isStreaming`方法来验证`csv_sdf``DataFrame`确实是流数据帧。 以下代码还向`printSchema`添加了一个调用,以验证模式是否按预期遵循`field_metadata`配置: ```py print(csv_sdf.isStreaming) csv_sdf.printSchema() ``` 返回值: ```py root |-- created_at: date (nullable = true) |-- text: string (nullable = true) |-- source: string (nullable = true) ``` 在继续下一步之前,重要的是要了解`csv_sdf` Streaming`DataFrame`如何适合结构化流编程模型,以及它具有哪些限制。 Spark 底层 API 定义了**弹性分布式数据集**(**RDD**)数据结构,该数据结构封装了管理分布式数据的所有底层复杂性。 诸如容错之类的功能(由于任何原因而崩溃的群集节点将在无需开发人员干预的情况下透明地重新启动)由框架自动处理。 RDD 操作有两种类型:转换和操作。 **转换**是对现有 RDD 的逻辑操作,除非调用某个操作(延迟执行),否则它们不会立即在集群上执行。 转换的输出是新的 RDD。 在内部,Spark 维护一个 RDD 非循环有向图,该图可跟踪导致创建 RDD 的所有沿袭,这在从服务器故障中恢复时非常有用。 示例转换包括`map`,`flatMap`,`filter`,`sample`和`distinct`。 在`DataFrame`(内部由 RDD 支持)上进行转换的情况也是如此,这些转换具有包括 SQL 查询的优点。 另一方面,**操作**不产生其他 RDD,而是对实际的分布式数据执行操作以返回非 RDD 值。 动作的示例包括`reduce`,`collect`,`count`和`take`。 如前所述,`csv_sdf`是一个 Streaming `DataFrame`,这意味着数据不断被添加到其中,因此我们只能对其进行转换,而不能对其进行操作。 为了解决这个问题,我们必须首先使用作为`pyspark.sql.streaming.DataStreamWriter`对象的`csv_sdf.writeStream`创建一个流查询。 流查询负责将结果发送到输出接收器。 然后我们可以使用`start()`方法运行流式查询。 Spark Streaming 支持多种输出接收器类型: * **文件**:支持所有经典文件格式,包括 JSON,CSV 和 Parquet * **Kafka**:直接写一个或多个 Kafka 主题 * **Foreach**:对集合中的每个元素运行任意计算 * **控制台**:将输出打印到系统控制台(主要用于调试) * **存储器**:输出存储在存储器中 在的下一部分中,我们将在`csv_sdf`上创建并运行结构化查询,该查询带有一个输出接收器,该输出接收器以 Parquet 格式存储输出。 ## 创建并运行结构化查询 使用`tweets_sdf`流式数据帧,我们创建了一个流式查询`tweet_streaming_query`,该查询使用*附加*输出模式将数据写入 Parquet 格式。 ### 注意 **注意**:Spark 流查询支持三种输出模式:**完整**,其中在每个触发器中写入整个表,**附加**,其中仅包含自上一个触发器以来的增量行写入,**更新**,其中仅写入已修改的行。 Parquet 是列式数据库格式,可为分布式分析提供高效,可扩展的存储。 [您可以在以下位置找到有关 Parquet 格式的更多信息](https://parquet.apache.org)。 以下代码创建并启动`tweet_streaming_query`流查询: ```py tweet_streaming_query = csv_sdf \ .writeStream \ .format("parquet") \ .option("path", os.path.join(root_dir, "output_parquet")) \ .trigger(processingTime="2 seconds") \ .option("checkpointLocation", os.path.join(root_dir, "output_chkpt")) \ .start() ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode7.py)。 同样,可以使用`stop()`方法停止流查询,如下所示: ```py tweet_streaming_query.stop() ``` 在前面的代码中,我们使用`path`选项指定 Parquet 文件的位置,并使用`checkpointLocation`指定在服务器故障的情况下将使用的恢复数据的位置。 我们还为从流中读取新数据和要添加到 Parquet 数据库中的新行指定触发间隔。 为了测试,每次`output_dir`目录中生成新的原始 CSV 文件时,也可以使用`console`接收器查看正在读取的新行: ```py tweet_streaming_query = csv_sdf.writeStream\ .outputMode("append")\ .format("console")\ .trigger(processingTime='2 seconds')\ .start() ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode8.py)。 您可以在 Spark 集群的主节点的系统输出中看到结果(您将需要物理访问主节点机器并查看日志文件,因为不幸的是,输出未打印到笔记本本身中,因为日志文件的位置取决于群集管理软件;有关更多信息,请参阅特定的文档)。 以下是显示特定批次的示例结果(标识符已被屏蔽): ```py ------------------------------------------- Batch: 17 ------------------------------------------- +----------+--------------------+-------------------+ |created_at| text| source| +----------+--------------------+-------------------+ |2018-04-12|RT @XXXXXXXXXXXXX...|Twitter for Android| |2018-04-12|RT @XXXXXXX: Base...| Twitter for iPhone| |2018-04-12|That's my roommat...| Twitter for iPhone| |2018-04-12|He's come a long ...| Twitter for iPhone| |2018-04-12|RT @XXXXXXXX: U s...| Twitter for iPhone| |2018-04-12|Baseball: Enid 10...| PushScoreUpdates| |2018-04-12|Cubs and Sox aren...| Twitter for iPhone| |2018-04-12|RT @XXXXXXXXXX: T...| RoundTeam| |2018-04-12|@XXXXXXXX that ri...| Twitter for iPhone| |2018-04-12|RT @XXXXXXXXXX: S...| Twitter for iPhone| +----------+--------------------+-------------------+ ``` ## 监视活动的流查询 启动流查询时,Spark 会分配群集资源。 因此,重要的是管理和监视这些查询,以确保不会耗尽群集资源。 您可以随时获取所有正在运行的查询的列表,如以下代码所示: ```py print(spark.streams.active) ``` 结果: ```py [, ] ``` 然后,您可以使用以下查询监视属性来深入了解每个查询的详细信息: * `id`:返回查询的唯一标识符,该标识符在从检查点数据重新启动后持续存在 * `runId`:返回为当前会话生成的唯一 ID * `explain()`:打印查询的详细说明 * `recentProgress`:返回最新进度更新的数组 * `lastProgress:`返回最新进度 以下代码显示每个活动查询的最新进度: ```py import json for query in spark.streams.active: print("-----------") print("id: {}".format(query.id)) print(json.dumps(query.lastProgress, indent=2, sort_keys=True)) ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode9.py)。 第一个查询的结果如下所示: ```py ----------- id: b621e268-f21d-4eef-b6cd-cb0bc66e53c4 { "batchId": 18, "durationMs": { "getOffset": 4, "triggerExecution": 4 }, "id": "b621e268-f21d-4eef-b6cd-cb0bc66e53c4", "inputRowsPerSecond": 0.0, "name": null, "numInputRows": 0, "processedRowsPerSecond": 0.0, "runId": "d2459446-bfad-4648-ae3b-b30c1f21be04", "sink": { "description": "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@586d2ad5" }, "sources": [ { "description": "FileStreamSource[file:/Users/dtaieb/cdsdev/notebookdev/Pixiedust/book/Chapter7/output/raw]", "endOffset": { "logOffset": 17 }, "inputRowsPerSecond": 0.0, "numInputRows": 0, "processedRowsPerSecond": 0.0, "startOffset": { "logOffset": 17 } } ], "stateOperators": [], "timestamp": "2018-04-12T21:40:10.004Z" } ``` 作为读者的练习,构建一个 PixieApp 很有用,该 PixieApp 提供一个实时仪表板,其中包含有关每个活动流查询的更新详细信息。 ### 注意 **注意**:我们将在“第 3 部分——创建实时仪表板 PixieApp”中展示如何构建此 PixieApp。 ## 从 Parquet 文件创建批量`DataFrame` ### 注意 **注意**:对于本章的其余部分,我们将批量 Spark`DataFrame`定义为经典的 Spark `DataFrame`,它是非流式的。 此流计算流程的最后一步是创建一个或多个批量`DataFrame`,我们可以将其用于构建分析和数据可视化。 我们可以认为这最后一步是对数据进行快照以进行更深入的分析。 有两种方法可以通过编程方式从 Parquet 文件中加载批量`DataFrame`: * 使用`spark.read`(注意,我们不像以前那样使用`spark.readStream`): ```py parquet_batch_df = spark.read.parquet(os.path.join(root_dir, "output_parquet")) ``` * 使用`spark.sql`: ```py parquet_batch_df = spark.sql( "select * from parquet.'{}'".format( os.path.join(root_dir, "output_parquet") ) ) ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode10.py)。 这种方法的好处是,我们可以使用任何 ANSI SQL 查询来加载数据,而不是使用在第一种方法中必须使用的等效低级`DataFrame`API。 然后,我们可以通过重新运行前面的代码并重新创建`DataFrame`来定期刷新数据。 现在,我们准备对数据进行进一步的分析,例如,通过对数据运行 PixieDust `display()`方法来创建可视化效果: ```py import pixiedust display(parquet_batch_df) ``` 我们选择**条形图**菜单并在**键**字段区域中拖放`source`字段。 由于我们只想显示前 10 条推文,因此我们在**要显示的行数**字段中设置此值。 以下屏幕截图显示了 PixieDust 选项对话框: ![Creating a batch DataFrame from the Parquet files](img/00150.jpeg) 用于显示前 10 条推文来源的“选项”对话框 在单击 **OK** 之后,我们看到以下结果: ![Creating a batch DataFrame from the Parquet files](img/00151.jpeg) 该图表按来源显示与棒球相关的推文数量 在本节中,我们已经看到了如何使用 Tweepy 库创建 Twitter 流,清理原始的数据并将其存储在 CSV 文件中,创建 Spark Streaming `DataFrame`,对其运行流查询并将其存储在一个 Parquet 数据库中,从 Parquet 文件创建一个批量`DataFrame`,并使用 PixieDust `display()`可视化数据。 ### 注意 [可在以下位置找到“第 1 部分-使用 Spark 结构化流获取数据”的完整笔记本](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/Twitter%20Sentiment%20Analysis%20-%20Part%201.ipynb)。 在下一部分中,我们将研究使用 IBM Watson Natural Language 了解服务通过情感和实体提取来丰富数据。 # 第 2 部分——使用情感和最相关的提取实体丰富数据 在这一部分中,我们使用情感信息来丰富 Twitter 数据,例如*正*,*负*和*中性*。 我们还希望从推文中提取最相关的实体,例如运动,组织和位置。 这些额外的信息将通过实时仪表板进行分析和可视化,我们将在下一部分中构建。 用于从非结构化文本中提取情感和实体的算法属于计算机科学和人工智能领域,称为**自然语言处理**(**NLP**)。 网络上有很多教程,其中提供了有关如何提取情感的算法示例。 例如,可以在[这个页面](https://github.com/scikit-learn/scikit-learn/blob/master/doc/tutorial/text_analytics/working_with_text_data.rst)。 但是,对于此示例应用,我们将不会构建自己的 NLP 算法。 相反,我们将选择一种基于云的服务,该服务提供文本分析,例如情感和实体提取。 当您有通用要求(例如不需要训练定制模型)时,这种方法非常有效,但是即使那样,大多数服务提供商现在都提供了实现此目的的工具。 与创建您自己的模型相比,使用基于云的供应商具有主要优势,例如节省了开发时间以及更好的准确率和性能。 通过一个简单的 REST 调用,我们将能够生成所需的数据并将其集成到我们的应用流中。 而且,如果需要,更改提供者将非常容易,因为负责与服务接口的代码已被很好地隔离了。 对于此示例应用,我们将使用 **IBM Watson 自然语言理解**(**NLU**)服务,该服务是 IBM Watson 认知服务家族的一部分,可在 IBM Cloud 上使用。 ## IBM Watson Natural 语言理解服务入门 对于每个云提供商,提供新服务的过程通常是相同的。 登录后,您将转到服务目录页面,可在其中搜索特定服务。 要登录到 IBM Cloud,只需转到[这个页面](https://console.bluemix.net)并创建一个免费的 IBM 帐户(如果您还没有)。 进入仪表板后,可以通过多种方式搜索 IBM Watson NLU 服务: * 单击左上方的菜单,然后选择 **Watson**,选择**浏览服务**,然后在服务列表中找到**自然语言理解**条目。 * 单击右上角的“创建资源”按钮进入目录。 进入目录后,您可以在搜索栏中搜索“自然语言理解”,如以下屏幕截图所示: ![Getting started with the IBM Watson Natural Language Understanding service](img/00152.jpeg) 在服务目录中搜索 Watson NLU 然后,您可以单击**自然语言理解**来设置新实例。 云提供商通常会为某些服务提供免费或基于试用的计划,这并不稀奇,幸运的是 Watson NLU 提供了其中一种服务,其局限性在于您只能训练一个自定义模型,每个模型最多可处理 30,000 个 NLU 项目。 月(足够用于我们的示例应用)。 选择 **Lite**(免费)计划并单击**创建**按钮后,新配置的实例将出现在仪表板上并准备接受请求。 ### 注意 **注意**:创建服务后,您可能会重定向到 NLU 服务*入门文档*。 如果是这样,只需导航回到仪表板,您将在其中看到列出的新服务实例。 下一步是通过进行 REST 调用来测试我们笔记本的服务。 每个服务都提供有关如何使用它的详细文档,包括 API 参考。 在笔记本中,我们可以根据 API 参考使用 request 包进行 GET,POST,PUT 或 DELETE 调用,但是强烈建议检查该服务是否为 SDK 提供了对 API 的高级编程访问。 幸运的是,IBM Watson 提供了`watson_developer_cloud`开源库,其中包括多个开源 SDK,这些 SDK 支持某些最受欢迎的语言,包括 Java,Python 和 Node.js。 对于本项目,[我们将使用 Python SDK 和位于此处的源代码和代码示例](https://github.com/watson-developer-cloud/python-sdk)。 以下`pip`命令直接从 Jupyter 笔记本安装`watson_developer_cloud`包: ```py !pip install Watson_developer_cloud ``` ### 注意 注意命令前面的`!`表示它是一个 shell 命令。 **注意**:安装完成后,不要忘记重新启动内核。 大多数云服务提供商都使用一种通用模式来让消费者通过该服务进行认证,该模式包括从将嵌入在客户端应用中的服务控制台仪表板生成一组凭据。 要生成凭证,只需单击 Watson NLU 实例的**服务凭证**选项卡,然后单击**新凭证**按钮。 这将以 JSON 格式生成一组新的凭据,如以下屏幕截图所示: ![Getting started with the IBM Watson Natural Language Understanding service](img/00153.jpeg) 为 Watson NLU 服务生成新的凭证 现在我们有了服务的凭据,我们可以创建一个`NaturalLanguageUnderstandingV1`对象,该对象将提供对 REST API 的编程访问,如以下代码所示: ```py from watson_developer_cloud import NaturalLanguageUnderstandingV1 from watson_developer_cloud.natural_language_understanding_v1 import Features, SentimentOptions, EntitiesOptions nlu = NaturalLanguageUnderstandingV1( version='2017-02-27', username='XXXX', password='XXXX' ) ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode11.py)。 **注意**:在前面的代码中,用服务凭据中的相应用户名和密码替换`XXXX`文本。 [`version`参数是指 API 的特定版本。 要了解最新版本,请转到位于此处的官方文档页面](https://www.ibm.com/watson/developercloud/natural-language-understanding/api/v1)。 在继续构建应用之前,让我们花一点时间来了解 Watson Natural Language 服务提供的文本分析功能,其中包括: * 情感 * 实体 * 概念 * 类别目录 * 感情 * 关键词 * 关系 * 语义角色 在我们的应用中,丰富 Twitter 数据发生在`RawTweetsListener`中,我们在其中创建了一个`enrich`方法,该方法将从`on_data`处理器方法中调用。 在此方法中,我们使用 Twitter 数据和仅包含情感和实体的特征列表调用`nlu.analyze`方法,如以下代码所示: ### 注意 **注意**:`[[RawTweetsListener]]`表示这意味着以下代码是名为`RawTweetsListener`的类的一部分,并且用户不应尝试在没有完整类的情况下照常运行代码。 始终可以始终参考完整的笔记本作为参考。 ```py [[RawTweetsListener]] def enrich(self, data): try: response = nlu.analyze( text = data['text'], features = Features( sentiment=SentimentOptions(), entities=EntitiesOptions() ) ) data["sentiment"] = response["sentiment"]["document"]["label"] top_entity = response["entities"][0] if len(response["entities"]) > 0 else None data["entity"] = top_entity["text"] if top_entity is not None else "" data["entity_type"] = top_entity["type"] if top_entity is not None else "" return data except Exception as e: self.warn("Error from Watson service while enriching data: {}".format(e)) ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode12.py)。 然后将结果存储在`data`对象中,该对象将被写入 CSV 文件。 我们还防止意外的异常跳过当前的推文并记录警告消息,而不是让异常冒泡,这会阻止 Twitter 流。 ### 注意 **注意**:当推文数据使用服务不支持的语言时,会发生最常见的异常。 我们使用第 5 章,“最佳实践和高级 PixieDust 概念”中描述的`@Logger`装饰器将消息记录到 PixieDust 日志记录框架中。 提醒一下,您可以使用另一个单元格中的`%pixiedustLog`魔术来查看日志消息。 我们仍然需要更改架构元数据以包括以下新字段: ```py field_metadata = [ {"name": "created_at", "type": DateType()}, {"name": "text", "type": StringType()}, {"name": "source", "type": StringType(), "transform": lambda s: BS(s, "html.parser").text.strip() }, {"name": "sentiment", "type": StringType()}, {"name": "entity", "type": StringType()}, {"name": "entity_type", "type": StringType()} ] ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode13.py)。 最后,我们更新`on_data`处理器以调用`enrich`方法,如下所示: ```py def on_data(self, data): def transform(key, value): return transforms[key](value) if key in transforms else value data = self.enrich(json.loads(data)) if data is not None: self.buffered_data.append( {key:transform(key,value) \ for key,value in iteritems(data) \ if key in fieldnames} ) self.flush_buffer_if_needed() return True ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode14.py)。 当我们重新启动 Twitter 流并创建 Spark Streaming`DataFrame`时,我们可以使用以下代码来验证我们是否具有正确的架构: ```py schema = StructType( [StructField(f["name"], f["type"], True) for f in field_metadata] ) csv_sdf = spark.readStream \ .csv( output_dir, schema=schema, multiLine = True, dateFormat = 'EEE MMM dd kk:mm:ss Z y', ignoreTrailingWhiteSpace = True, ignoreLeadingWhiteSpace = True ) csv_sdf.printSchema() ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode15.py)。 哪个显示了以下预期结果: ```py root |-- created_at: date (nullable = true) |-- text: string (nullable = true) |-- source: string (nullable = true) |-- sentiment: string (nullable = true) |-- entity: string (nullable = true) |-- entity_type: string (nullable = true) ``` 同样,当我们使用`console`接收器运行结构化查询时,数据将在 Spark 主节点的控制台中分批显示,如下所示: ```py ------------------------------------------- Batch: 2 ------------------------------------------- +----------+---------------+---------------+---------+------------+-------------+ |created_at| text| source|sentiment| entity| entity_type| +----------+---------------+---------------+---------+------------+-------------+ |2018-04-14|Some little ...| Twitter iPhone| positive| Drew| Person|d |2018-04-14|RT @XXXXXXXX...| Twitter iPhone| neutral| @XXXXXXXXXX|TwitterHandle| |2018-04-14|RT @XXXXXXXX...| Twitter iPhone| neutral| baseball| Sport| |2018-04-14|RT @XXXXXXXX...| Twitter Client| neutral| @XXXXXXXXXX|TwitterHandle| |2018-04-14|RT @XXXXXXXX...| Twitter Client| positive| @XXXXXXXXXX|TwitterHandle| |2018-04-14|RT @XXXXX: I...|Twitter Android| positive| Greg XXXXXX| Person| |2018-04-14|RT @XXXXXXXX...| Twitter iPhone| positive| @XXXXXXXXXX|TwitterHandle| |2018-04-14|RT @XXXXX: I...|Twitter Android| positive| Greg XXXXXX| Person| |2018-04-14|Congrats to ...|Twitter Android| positive| softball| Sport| |2018-04-14|translation:...| Twitter iPhone| neutral| null| null| +----------+---------------+---------------+---------+------------+-------------+ ``` 最后,我们使用 Parquet `output`接收器运行结构化查询,创建一个批量`DataFrame`,并使用 PixieDust `display()`浏览数据以显示例如按情感分类的推文数(`positive`,`negative`,`neutral`)由实体聚类,如下图所示: ![Getting started with the IBM Watson Natural Language Understanding service](img/00154.jpeg) 条形图显示按实体分组的按情感分类的推文数量 ### 注意 [“第 2 部分——使用情感丰富数据”的完整笔记本](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/Twitter%20Sentiment%20Analysis%20-%20Part%202.ipynb)。 如果您正在运行它,建议您通过向架构中添加更多字段,运行不同的 SQL 查询并使用 PixieDust `display()`可视化数据来进行试验。 在下一部分中,我们将构建一个仪表板,以显示有关 Twitter 数据的多个指标。 # 第 3 部分——创建实时仪表板 PixieApp 与往常一样,我们首先需要定义仪表板的 MVP 版本的要求。 这次,我们将从敏捷方法中借用一个称为**用户故事**的工具,其中从用户的角度描述了我们要从构建的功能。 敏捷方法论还规定,通过将不同用户分类为角色,可以充分理解将与软件交互的不同用户的上下文。 在我们的案例中,我们只会使用一种角色:*市场总监 Frank*,他希望从消费者在社交媒体上谈论的内容中获得实时见解。 用户的故事是这样的: * 弗兰克(Frank)输入搜索查询,例如产品名称 * 然后显示一个仪表板,该仪表板显示一组图表,这些图表显示有关用户情感(积极,消极,中立)的指标 * 仪表板还包含推文中要说出的所有实体的词云 * 此外,仪表板具有一个选项,可以显示当前活动的所有 Spark Streaming 查询的实时进度 ### 注意 **注意**:Frank 确实不需要最后一个功能,但是无论如何我们在此展示它作为前面给出的练习的示例实现。 ## 将分析重构为自己的方法 在开始之前,我们需要重构启动 Twitter 流的代码并将 Spark Streaming`DataFrame`创建为将在 PixieApp 中调用的自己的方法。 `start_stream,` `start_streaming_dataframe`和`start_parquet_streaming_query`方法如下: ```py def start_stream(queries): "Asynchronously start a new Twitter stream" stream = Stream(auth, RawTweetsListener()) stream.filter(track=queries, languages=["en"], async=True) return stream ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode16.py)。 ```py def start_streaming_dataframe(output_dir): "Start a Spark Streaming DataFrame from a file source" schema = StructType( [StructField(f["name"], f["type"], True) for f in field_metadata] ) return spark.readStream \ .csv( output_dir, schema=schema, multiLine = True, timestampFormat = 'EEE MMM dd kk:mm:ss Z yyyy', ignoreTrailingWhiteSpace = True, ignoreLeadingWhiteSpace = True ) ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode17.py)。 ```py def start_parquet_streaming_query(csv_sdf): """ Create and run a streaming query from a Structured DataFrame outputing the results into a parquet database """ streaming_query = csv_sdf \ .writeStream \ .format("parquet") \ .option("path", os.path.join(root_dir, "output_parquet")) \ .trigger(processingTime="2 seconds") \ .option("checkpointLocation", os.path.join(root_dir, "output_chkpt")) \ .start() return streaming_query ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode18.py)。 在准备工作的中,我们还需要管理 PixieApp 将创建的不同流的生命周期,并确保在用户重新启动仪表板时正确​​停止了基础资源。 为了解决这个问题,我们创建了一个`StreamsManager`类,该类封装了 Tweepy `twitter_stream`和 CSV Streaming `DataFrame`。 此类具有`reset`方法,该方法将停止`twitter_stream`,停止所有活动的流查询,删除从先前查询创建的所有输出文件,并使用新的查询字符串开始一个新的输出文件。 如果在没有查询字符串的情况下调用`reset`方法,则我们不会启动新的流。 我们还创建了一个全局`streams_manager`实例,即使重新启动仪表板,该实例也将跟踪当前状态。 由于用户可以重新运行包含全局`streams_manager`的单元格,因此我们需要确保在删除当前全局实例的时自动调用`reset`方法。 为此,我们覆盖了对象的`__del__`方法,这是 Python 实现析构器并调用`reset`的方法。 `StreamsManager`的代码如下所示: ```py class StreamsManager(): def __init__(self): self.twitter_stream = None self.csv_sdf = None def reset(self, search_query = None): if self.twitter_stream is not None: self.twitter_stream.disconnect() #stop all the active streaming queries and re_initialize the directories for query in spark.streams.active: query.stop() # initialize the directories self.root_dir, self.output_dir = init_output_dirs() # start the tweepy stream self.twitter_stream = start_stream([search_query]) if search_query is not None else None # start the spark streaming stream self.csv_sdf = start_streaming_dataframe(output_dir) if search_query is not None else None def __del__(self): # Automatically called when the class is garbage collected self.reset() streams_manager = StreamsManager() ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode19.py)。 ## 创建 PixieApp 就像第 6 章,“使用 TensorFlow 的图像识别”一样,我们将再次使用`TemplateTabbedApp`类创建带有两个 PixieApps 的标签布局: * `TweetInsightApp`:让用户指定查询字符串并显示与之关联的实时仪表板 * `StreamingQueriesApp`:监视活动的结构化查询的进度 在`TweetInsightApp`的默认路由中,我们返回一个片段,该片段向用户询问查询字符串,如下所示: ```py from pixiedust.display.app import * @PixieApp class TweetInsightApp(): @route() def main_screen(self): return """
""" TweetInsightApp().run() ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode20.py)。 以下屏幕截图显示了运行上述代码的结果: ### 注意 **注意**:我们将创建具有选项卡式布局的主`TwitterSentimentApp` PixieApp,并在本节稍后部分中包含此类。 目前,我们仅显示`TweetInsightApp`子应用。 ![Creating the PixieApp](img/00155.jpeg) Twitter 情感仪表板的欢迎屏幕 在`Go`按钮中,我们使用用户提供的查询字符串调用`search_query`路由。 在此路由中,我们首先启动各种流,并从 Parquet 数据库所在的输出目录中创建一个存储在名为`parquet_df`的类变量中的批量`DataFrame`。 然后我们返回由三个小部件组成的 HTML 片段,其中显示了以下指标: * 实体聚集的三种情感中的每一种的条形图 * 折线子图按情感分布显示了的推文 * 实体的词云 每个小部件都使用第 5 章,“最佳做法和高级 PixieDust 概念”中记录的`pd_refresh_rate`属性,定期调用特定的路由。 我们还确保重新加载`parquet_df`变量以获取自上次以来到达的新数据。 然后在`pd_entity`属性中引用此变量以显示图表。 以下代码显示了`search_query`路由的实现: ```py import time [[TweetInsightApp]] @route(search_query="*") def do_search_query(self, search_query): streams_manager.reset(search_query) start_parquet_streaming_query(streams_manager.csv_sdf) while True: try: parquet_dir = os.path.join(root_dir, "output_parquet") self.parquet_df = spark.sql("select * from parquet.'{}'".format(parquet_dir)) break except: time.sleep(5) return """
print("Number of tweets received: {}".format(streams_manager.twitter_stream.listener.tweet_count))
""" ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode21.py)。 从前面的代码中有多个注意事项: * 当我们尝试加载`parquet_df`批量`DataFrame`时,Parquet 文件的输出目录可能未准备好,这会导致异常。 为了解决此时序问题,我们将代码包装到`try...except`语句中,并使用`time.sleep(5)`等待 5 秒钟。 * 我们还将在标题中显示当前的推文计数。 为此,我们添加一个`
`元素,该元素每 5 秒刷新一次,并添加一个``,该元素使用`streams_manager.twitter_stream.listener.tweet_count`来打印当前的推文计数,该变量是我们添加到 `RawTweetsListener`类。 我们还更新了`on_data()`方法,以在每次有新的推文到达时增加`tweet_count`变量,如以下代码所示: ```py [[TweetInsightApp]] def on_data(self, data): def transform(key, value): return transforms[key](value) if key in transforms else value data = self.enrich(json.loads(data)) if data is not None: self.tweet_count += 1 self.buffered_data.append( {key:transform(key,value) \ for key,value in iteritems(data) \ if key in fieldnames} ) self.flush_buffer_if_needed() return True ``` 另外,为避免闪烁,我们防止在`
`元素中使用`class="no_loading_msg"`显示*加载微调器*图像。 * 我们调用三个不同的路由(`display_metric1`,`display_metric2`和`display_wc`),分别显示三个小部件。 `display_metric1`和`display_metric2`路由非常相似。 他们返回一个带有`parquet_df`作为`pd_entity`的 DIV 和一个自定义``子元素,该子元素包含传递给 PixieDust `display()`层的 JSON 配置。 以下代码显示了`display_metric1`路由的实现: ```py [[TweetInsightApp]] @route(display_metric1="*") def do_display_metric1(self, display_metric1): parquet_dir = os.path.join(root_dir, "output_parquet") self.parquet_df = spark.sql("select * from parquet.'{}'".format(parquet_dir)) return """
{ "legend": "true", "keyFields": "sentiment", "clusterby": "entity_type", "handlerId": "barChart", "rendererId": "bokeh", "rowCount": "10", "sortby": "Values DESC", "noChartCache": "true" }
""" ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode22.py)。 `display_metric2`路由遵循相似的模式,但具有`pd_options`属性不同的集合。 最后一条路由是`display_wc`,负责为实体显示单词 cloud。 此路由使用`wordcloud` Python 库,您可以通过以下命令进行安装: ```py !pip install wordcloud ``` ### 注意 **注意**:与往常一样,安装完成后,不要忘记重新启动内核。 我们使用第 5 章,“最佳做法和高级 PixieDust 概念中记录的`@captureOutput`装饰器,如下所示: ```py import matplotlib.pyplot as plt from wordcloud import WordCloud [[TweetInsightApp]] @route(display_wc="*") @captureOutput def do_display_wc(self): text = "\n".join( [r['entity'] for r in self.parquet_df.select("entity").collect() if r['entity'] is not None] ) plt.figure( figsize=(13,7) ) plt.axis("off") plt.imshow( WordCloud(width=750, height=350).generate(text), interpolation='bilinear' ) ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode23.py)。 传递给`WordCloud`类的文本是通过收集`parquet_df`批量`DataFrame`中的所有实体而生成的。 以下屏幕快照显示了让通过搜索查询`baseball`创建的 Twitter 流运行一段时间后的仪表板: ![Creating the PixieApp](img/00156.jpeg) Twitter 情感仪表板的搜索查询“棒球” 第二个 PixieApp 用于监视正在运行的流查询。 主路由返回一个 HTML 片段,该片段具有一个`
`元素,该元素以固定间隔(5000 ms)调用`show_progress`路由,如以下代码所示: ```py @PixieApp class StreamingQueriesApp(): @route() def main_screen(self): return """
""" ``` ### 注意 [您可以在此处找到代码文件](https://github.com/DTAIEB/Thoughtful-Data-Science/blob/master/chapter%207/sampleCode24.py)。 在`show_progress`路由中,我们使用本章前面介绍的`query.lastProgress`监视 API,使用 Jinja2 `{%for%}`循环遍历 JSON 对象,并在表中显示结果,如以下代码所示: ```py @route(show_progress="true") def do_show_progress(self): return """ {%for query in this.spark.streams.active%}