InfoSphere Streams——实时大数据分析平台
了解 ,它是 IBM 大数据平台的一部分。 解决了针对能够实时处理生成的海量流数据的平台和架构的一种迫切需求。了解该产品的设计目标,它适用于哪些时机,其工作原理,以及它如何为 InfoSphere BigInsights 提供补充来执行高度复杂的分析。
来自多个来源的信息正在以难以置信的速度增长。互联网用户数量在 2015 年已经达到 22.7 亿。每一天,Twitter 都会生成超过 12 TB 的 tweet,Facebook 生成超过 25 TB 日志数据,纽约证券交易所采集 1 TB 交易信息。每天会创建大约 300 亿个射频识别 (RFID) 标记。此外,每年销售的数亿台 GPS 设备,目前正在使用的超过 3000 万个连网的传感器(而且每年在以高于 30% 的速度增长),都在产生数据。这些数据量预计在未来 10 年中每 2 年就会翻一番。
一家公司在一年时间内可生成高达数 PB 的信息:网页、博客、单击流、搜索索引、社交媒体论坛、即时消息、文本消息、电子邮件、文档、用户人口统计数据、来自主动和被动系统的传感器数据,等等。许多人估计,这些数据中高达 80% 都是半结构化或非结构化数据。公司一直在寻求更加敏捷地经营业务,以更加创新的方式执行数据分析和决策流程。而且他们认识到,这些流程中损失的时间可能导致错失业务机会。挑战的核心是,公司掌握轻松地分析和理解互联网级信息的能力,就像他们现在可分析和理解较少量结构化信息一样。
IBM 正在帮助公司应对大数据挑战,为他们提供工具来集成和管理海量、高速产生的数据,应用原生格式的分析,可视化可用数据以进行专门分析,等等。本文将介绍 InfoSphere Streams,该技术支持您同时分析许多数据类型并实时执行复杂计算。您将了解 InfoSphere Streams 的工作原理,它的用途,以及如何结合使用它与另一个用于的 IBM 产品(IBM InfoSphere BigInsights)来执行高度复杂的分析。
InfoSphere BigInsights:概述
理解 InfoSphere BigInsights 将会使您能够更全面地理解 InfoSphere Streams 的用途和价值。
BigInsights 是一个分析平台,可帮助公司将复杂的互联网级信息集转换为洞察。它包含一个套装的 Apache Hadoop 发行版(具有高度简化的安装流程)和用于应用程序开发、数据移动和集群管理的关联工具。得益于简单性和可伸缩性,Hadoop(MapReduce 框架的一种开源实现)在行业和学术界获得的巨大的成功。除了 Hadoop 之外,BigInsights 中的其他开源技术(除 Jaql 外的所有技术都属于 Apache Software Foundation 项目)包括:
- Pig:该平台提供了一种高级语言来表达分析大数据集的程序。Pig 配备了一个编译器,可将 Pig 程序转换为 Hadoop 框架执行的 MapReduce 作业序列。
- Hive:一个构建于 Hadoop 环境之上的数据仓库解决方案。它为 Hadoop 的非结构化世界带来了人们熟悉的关系数据库概念,比如表、列和分区,以及 SQL 的一个子集 (HiveQL)。Hive 查询被编译为使用 Hadoop 执行的 MapReduce 作业。
- Jaql:IBM 专为 JSON(JavaScript Object Notation,JavaScript 对象表示法)开发的一种查询语言,提供了一种类似 SQL 的接口。Jaql 适度地处理嵌套,高度面向函数,而且非常灵活。它适用于松散的结构化数据,是 HBase 列存储和文本分析的接口。
- HBase:一个面向列的 NoSQL 数据存储环境,旨在支持 Hadoop 中大型、稀疏填充的表。
- Flume:一种分布式、可靠且可用的服务,用于高效地移动生成的大量数据。Flume 非常适合从多个系统中收集生成的日志,在它们插入 HDFS(Hadoop Distributed File System,Hadoop 分布式文件系统)。
- Lucene:一个搜索引擎库,提供了高性能的、全功能的文本搜索。
- Avro:一种数据序列化技术,使用 JSON 来定义数据类型和协议,以一种紧凑的二进制格式对数据执行序列化。
- ZooKeeper:一种维护配置信息和命名,提供分布式同步和分组服务的集中化服务。
- Oozie:一个工作流调度程序系统,用于管理和编排 Apache Hadoop 作业的执行过程。
此外,BigInsights 发行版还包含以下 IBM 独有的技术:
- BigSheets:一种基于浏览器、类似电子表格的查询和探索接口,使业务用户能够轻松地收集和分析数据,利用 Hadoop 的强大功能。它提供了内置的阅读器,可处理多种常见格式的数据,包括 JSON、逗号分隔值 (CSV) 和制表符分隔值 (TSV)。
- Text analytics:常见业务实体的文本注释符的一个预先构建的库。它提供了丰富的语言和工具来构建自定义位置注释符。
- Adaptive MapReduce:一个 IBM Research 解决方案,通过更改 MapReduce 任务的处理方式来加速小型 MapReduce 作业的执行。
InfoSphere 平台
是一个综合性的信息集成平台,包含数据仓库和分析、信息集成、主数据管理、生命周期管理,以及数据安全和隐私。该平台改进了应用程序开发流程,所以组织可以加快价值实现速度,减少集成成本,并提高信息质量。
一般来讲,BigInsights 的设计并不是为了取代一种传统的关系数据库管理系统 (DBMS) 或传统的数据仓库。具体来讲,它没有针对对表列数据结构的交互式查询、在线分析处理 (OLAP) 或在线事务处理 (OLTP) 应用程序而优化。但是,作为 IBM 大数据平台的组成部分,BigInsights 提供了与该平台的其他组件(包括数据仓库、数据集成和治理引擎)和第三方数据分析工具的潜在集成点。在本文后面将会看到,它还可与 InfoSphere Streams 集成。
流计算:一种新的计算模式
流计算是新的数据声场场景所不可或缺的一种新计算模式,比如无处不在的移动设备、位置服务和遍布各处的传感器。人们需要可伸缩的计算平台和并行架构来处理生成的海量流数据。
BigInsights 技术不足以支持实时流处理任务,因为它们主要面向对静态数据的批处理的支持。在处理静态数据的过程中,列出所有已连网的用户这样的查询会得到单一的结果集。借助对流数据的实时处理,您可执行一种持续查询,比如列出在过去 10 分钟内连网的所有用户。此查询将返回持续更新的结果。在静态数据领域中,用户犹如在干草堆中捞针;在流数据领域中,用户可轻松地找到这颗针,因为干草已被吹走。
InfoSphere Streams 平台支持流数据的实时处理,支持不断更新持续查询的结果,可在仍在移动的数据流中检测洞察。
InfoSphere Streams 概述
InfoSphere Streams 旨在从一个几分钟到几小时的窗口中的移动信息(数据流)中揭示有意义的模式。该平台能够获取低延迟洞察,并为注重时效的应用程序(比如欺诈检测或网络管理)获取更好的成果,从而提供业务价值。InfoSphere Streams 还可合并多个流,使您能够从多个流中获取新洞察,如图 3 所示。
图 3. 合并的流处理
InfoSphere Streams 的主要设计目的是:
- 快速响应事件和不断变化的业务条件与需求。
- 支持以比现有系统快几个数量级的速度对数据执行持续分析。
- 快速适应不断变化的数据形式和类型。
- 管理新的流模式的高可用性、异构性和分布。
- 为共享的信息提供安全性和信息机密性。
InfoSphere Streams 提供了一种编程模型和 IDE 来定义数据来源,还提供了已融合到处理执行单元中的称为运算符的软件分析模块。它还提供了基础架构来支持从这些组件合成可扩展的流处理应用程序。主要平台组件包括:
- 运行时环境:这包括平台服务,以及一个用于在单个主机或一组集成的主机上部署和监视 Streams 应用程序的调度程序。
- 编程模型:您可使用 SPL(Streams Processing Language,流处理语言,一种声明性语言)来编写 Streams 应用程序。可使用该语言陈述您的需求,运行时环境会承担确定如何最佳地服务该请求的责任。在此模型中,一个 Streams 应用程序表示为一个由运算符和连接它们的流组成的图表。
- 监视工具和管理接口:Streams 应用程序处理数据的速度比普通的操作系统监视实用程序快得多。InfoSphere Streams 提供了可处理此环境的工具。
流处理语言
SPL,InfoSphere Streams 的编程语言,是一种分布式数据流合成语言。它是一种类似 C++ 或 Java™ 的可扩展且全功能的语言,支持用户定义的数据类型。您可以使用 SPL 或原生语言(C++ 或 Java)编写自定义函数。也可以使用 C++ 或 Java 编写用户定义的运算符。
InfoSphere Streams 持续应用程序会描述一个导向图,该图由各个互联且处理多个数据流的运算符组成。数据流可来自系统外部,或者在应用程序内部生成。SPL 程序的基本构建块包括:
- 流:一个无限的结构化元组序列。它可逐个元组地由运算符使用或通过一个窗口的定义来使用。
- 元组:属性及其类型的一个结构化列表。流上的每个元组拥有由其流类型指定的形式。
- 流类型:指定元组中每个属性的名称和数据类型。
- 窗口:一个有限、有序的元组分组。它可以基于计数、时间、属性值或标点符号。
- 运算符:SPL 的基础构建块,它的运算符会处理来自流的数据并可生成新流。
- 处理元素 (PE):基础执行单元。一个 PE 可封装单个运算符或多个合并的运算符。
- 作业:一个已部署好的用来执行的 Streams 应用程序。它由一个或多个 PE 组成。除了一组 PE 之外,SPL 编译器还会生成一个 ADL(Application Description Language,应用程序描述语言)文件来描述应用程序的结构。该 ADL 文件包含每个 PE 的详细信息,比如要加载和执行哪个二进制文件,调度限制、流格式和一个内部运算符数据流图。
图 4 演示了 SPL 程序的 InfoSphere Streams 运行时视图:
图 4. InfoSphere 运行时执行
一个运算符表示一个可重用的流转换器,将一些输入流转换为输出流。在 SPL 程序中,运算符调用可实现预算法的特定用途,使用分配的特定的输入和输出流,以及在本地指定的参数和逻辑。每次运算符调用都会对输入和输出流命名。各种内置的 InfoSphere Streams 运算符提供了许多强大的功能:
Source
:读取流格式的输入数据。Sink
:将输出流的数据写入外部存储或系统中。Functor
:过滤、转换和对输入流的数据执行各种功能。Sort
:对定义的键上的流数据排序。Split
:将输入流数据拆分为多个输出流。Join
:合并定义的键上的输入流数据。Aggregate
:聚合定义的键上的流数据。Barrier
:组合和匹配流数据。Delay
:演示一个流数据流。Punctor
:识别应一起处理的数据分组。
一个流连接到一个运算符的位置称为端口。许多运算符(例如 Functor
)有一个输入端口和一个输出端口,但运算符也可以没有输入端口(比如 Source
)和没有输出端口(比如 Sink
),或者拥有多个输入或输出端口(比如 Split
和 Join
)。清单 1 给出了 Sink
的一个 SPL 示例,它有一个输入端口并将输出元组写入到一个磁盘文件中。
清单 1. Sink
示例
() as Sink = FileSink(StreamIn) { param file : "/tmp/people.dat"; format : csv; flush : 20u; }
在 清单 1 中,file
是一个强制性参数,提供了输出文件的路径。flush
参数用于清除给定数量的元组后的输出。format
参数指定了输出文件的格式。
组合运算符是一个运算符集合。它表示对原始(非组合)运算符或组合(嵌套)运算符的一个子图的一种封装。它类似于过程语言中的宏。
一个应用程序由一个没有输入或输出端口的主要组合运算符表示。数据可流入和流出,但不会流到一个图表内的流上,而且流可导出到在同一个实例中运行的其他应用程序和从这些应用程序导入。清单 2 中的代码给出了主要组合运算符的框架。
清单 2. 主要组合运算符的结构
composite Main { graph stream ... { } stream ... { } ... }
作为一个示例,我们来看一个简单的流应用程序 WordCount,它统计一个文件中的行数和字数。该程序由以下流图组成:
- 一个
Source
预算法调用,读取一个文件并将各行发送给数据流。 - 一个
Functor
运算符调用,统计行数和每个数据行的字数,将统计数据发送给它的输出流。 - 一个
Counter
运算符调用,聚合文件中所有行的统计数据并打印在末尾。
在介绍 WordCount 的主要组合运算符之前,我将定义一些帮助器。我将为一行的统计数据使用 LineStat
类型。此外,我需要构建一个 countWords(rstring line)
函数来统计一行中的字数,需要使用一个 addM(mutable LineStat x, LineStat y)
函数来添加两个 LineStat
值并存储结果。清单 3 定义了这些帮助器。
清单 3. WordCount 帮助器定义
type LineStat = tuple<int32 lines, int32 words>; int32 countWords(rstring line) { return size(tokenize(line, " \t", false)); } void addM(mutable LineStat x, LineStat y) { x.lines += y.lines; x.words += y.words; }
现在可以定义主要组合运算符了,如清单 4 所示。
清单 4. WordCount 的主要组合运算符
composite WordCount { graph stream<rstring line> Data = FileSource() { param file : getSubmissionTimeValue("file"); format : line; } stream<LineStat> OneLine = Functor(Data) { output OneLine : lines = 1, words = countWords(line); } () as Counter = Custom(OneLine) { logic state : mutable LineStat sum = { lines = 0, words = 0 }; onTuple OneLine : addM(sum, OneLine); onPunct OneLine : if (currentPunct() == Sys.FinalMarker) println(sum); } }
开发环境
InfoSphere Streams 提供了一个敏捷开发环境,该环境由 Eclipse IDE、Streams Live Graph 视图和一个流调试器组成。该平台还包含用于加速和简化特定功能或行业的解决方案开发的工具包:
- 标准工具包:包含随产品发布的默认运算符:
- 关系运算符,比如
Filter
、Sort
、Functor
、Join
、Punctor
和Aggregate
- 适配器 运算符,比如
FileSource
、FileSink
、DirectoryScan
和Export
- 实用程序运算符,比如
Custom Split
、DeDuplicate
、Throttle
、Union
、Delay
、ThreadedSplit
、Barrier
和DynamicFilter
- 关系运算符,比如
- 互联网工具包:包括
HTTP
、FTP
、HTTPS
、FTPS
和RSS
等运算符。 - 数据库工具包:支持 DBMS,包括 DB2®、Netezza、Oracle Database、SQL Server 和 MySQL。
- 其他内置工具包:金融、数据挖掘、大数据和文本工具包。
此外,您可定义您自己的工具包,提供可重用的运算符和函数集,并创建跨领域和特定于领域的加速器。它们可包含原始和组合运算符,也可同时使用原生和 SPL 函数。
BigInsights 和 InfoSphere Streams 之间的集成和交互
不断从系统中生成大量宝贵数据的公司正面临为以下两个重要用途而分析数据的问题困扰:及时感知和响应当前事件,根据历史知识进行预测,从而指导响应。这一情形产生了无缝运行移动数据(当前数据)和静止数据(历史数据)分析、处理海量、多样性、高速产生的数据的需求。IBM 的移动数据 (InfoSphere Streams) 与静止数据 (BigInsights) 平台的集成解决了 3 个主要应用场景的需求:
- 可伸缩的数据获取:通过 Streams 持续将数据获取到 BigInsights 中。例如,通常需要获取来自社交媒体来源(比如 Twitter 和 Facebook)的非结构化文本数据,以提取各种类型的态度和线索。在这种情况下,如果文本提取在获取数据时执行,那么尽早消除垃圾邮件等无关数据会让效率高很多。这种集成使公司能够避免巨额的非必要存储成本。
- 加速和充实:从 BigInsights 生辰历史上下文来加速分析和充实传入的 Streams 数据。BigInsights 可用于分析在较长的时间窗口内从各种连续和静态的数据来源吸收和集成的数据。此分析的结果为各种在线分析提供了上下文,可用于将它们引导至一种已知状态。回到社交媒体应用程序的场景,一条传入的 Twitter 消息仅拥有发布该消息的人的 ID。但是,历史数据可通过属性(比如影响者)充实该信息,为执行下游分析以适当应对此用户所表达的态度提供机会。
- 自适应分析模型:BigInsights 上的分析操作(比如数据挖掘、机器学习或统计建模)所生成的模型。这些可用作分析 Streams 上的传入数据的基础,基于实时观察结果而更新。
IBM 大数据平台的移动数据和静止数据部分可通过 3 种主要的组件类型来集成:
- 通用分析:相同的分析功能可用在 Streams 和 BigInsights 上。
- 通用数据格式:Streams 格式运算符可在 Streams 元组格式和 BigInsights 使用的数据格式之间转换数据。
- 数据交换适配器:Streams
Source
和Sink
适配器可用于与 BigInsights 交换数据。
结束语
帮助公司管理、分析和利用大数据是 IBM 大数据平台的主要关注领域。本文介绍了 InfoSphere Streams,它是 IBM 用来存储和分析移动数据(流数据)的软件平台。本文还概述了如何集成 InfoSphere Streams 与 BigInsights,它们是 IBM 用来存储和分析静止数据的软件平台,以便充实实现更复杂分析的能力。许多公司认识到,充分利用大数据是提供独特的业务价值和优势的一个重要的信息管理手段。如果您已准备好使用 InfoSphere streams,请参见 参考资料,获取免费的培训材料和软件。
详情请咨询!
客服热线:023-66090381