InfoSphere Streams 旨在从一个几分钟到几小时的窗口中的移动信息(数据流)中揭示有意义的模式。该平台能够获取低延迟洞察,并为注重时效的应用程序(比如欺诈检测或网络管理)获取更好的成果,从而提供业务价值。InfoSphere Streams 还可合并多个流,使您能够从多个流中获取新洞察,如图 3 所示。
图 3. 合并的流处理
InfoSphere Streams 的主要设计目的是:
- 快速响应事件和不断变化的业务条件与需求。
- 支撑以比现有系统快几个数量级的速度对数据实行持续分析。
- 快速适应不断变化的数据形式和类型。
- 管理新的流模式的高可用性、异构性和分布。
- 为共享的信息提供安全性和信息机密性。
InfoSphere Streams 提供了一种编程模型和 IDE 来定义数据来源,还提供了已融合到处理实行单元中的称为运算符的App分析模块。它还提供了基础架构来支撑从这些组件合成可扩展的流处理应用程序。主要平台组件包括:
- 运行时环境:这包括平台服务,以及一个用于在单个主机或一组集成的主机上部署和监视 Streams 应用程序的调度程序。
- 编程模型:您可使用 SPL(Streams Processing Language,流处理语言,一种声明性语言)来编写 Streams 应用程序。可使用该语言陈述您的需求,运行时环境会承担确定如何最佳地服务该请求的责任。在此模型中,一个 Streams 应用程序表示为一个由运算符和连接它们的流组成的图表。
- 监视工具和管理接口:Streams 应用程序处理数据的速度比普通的操作系统监视实用程序快得多。InfoSphere Streams 提供了可处理此环境的工具。
流处理语言 SPL,InfoSphere Streams 的编程语言,是一种分布式数据流合成语言。它是一种类似 C++ 或 Java™ 的可扩展且全功能的语言,支撑用户定义的数据类型。您可以使用 SPL 或原生语言(C++ 或 Java)编写自定义函数。也可以使用 C++ 或 Java 编写用户定义的运算符。 InfoSphere Streams 持续应用程序会描述一个导向图,该图由各个互联且处理多个数据流的运算符组成。数据流可来自系统外部,或者在应用程序内部生成。SPL 程序的基本构建块包括:
- 流:一个无限的结构化元组序列。它可逐个元组地由运算符使用或通过一个窗口的定义来使用。
- 元组:属性及其类型的一个结构化列表。流上的每个元组拥有由其流类型指定的形式。
- 流类型:指定元组中每个属性的名称和数据类型。
- 窗口:一个有限、有序的元组分组。它可以基于计数、时间、属性值或标点符号。
- 运算符:SPL 的基础构建块,它的运算符会处理来自流的数据并可生成新流。
- 处理元素 (PE):基础实行单元。一个 PE 可封装单个运算符或多个合并的运算符。
- 作业:一个已部署好的用来实行的 Streams 应用程序。它由一个或多个 PE 组成。除了一组 PE 之外,SPL 编译器还会生成一个 ADL(Application Description Language,应用程序描述语言)文件来描述应用程序的结构。该 ADL 文件包含每个 PE 的详细信息,比如要加载和实行哪个二进制文件,调度限制、流格式和一个内部运算符数据流图。
图 4 演示了 SPL 程序的 InfoSphere Streams 运行时视图:
图 4. InfoSphere 运行时实行
一个运算符表示一个可重用的流转换器,将一些输入流转换为输出流。在 SPL 程序中,运算符调用可实现预算法的特定用途,使用分配的特定的输入和输出流,以及在本地指定的参数和逻辑。每次运算符调用都会对输入和输出流命名。各种内置的 InfoSphere Streams 运算符提供了许多强大的功能:
- Source:读取流格式的输入数据。
- Sink:将输出流的数据写入外部存储或系统中。
- Functor:过滤、转换和对输入流的数据实行各种功能。
- Sort:对定义的键上的流数据排序。
- Split:将输入流数据拆分为多个输出流。
- Join:合并定义的键上的输入流数据。
- Aggregate:聚合定义的键上的流数据。
- Barrier:组合和匹配流数据。
- Delay:演示一个流数据流。
- Punctor:识别应一起处理的数据分组。
[size=1.166em]一个流连接到一个运算符的位置称为端口。许多运算符(例如 Functor)有一个输入端口和一个输出端口,但运算符也可以没有输入端口(比如 Source)和没有输出端口(比如 Sink),或者拥有多个输入或输出端口(比如 Split 和 Join)。清单 1 给出了 Sink 的一个 SPL 示例,它有一个输入端口并将输出元组写入到一个磁盘文件中。 清单 1. Sink 示例() as Sink = FileSink(StreamIn) { param file : "/tmp/people.dat"; format : csv; flush : 20u;}
[size=1.166em]在 清单 1 中,file 是一个强制性参数,提供了输出文件的路径。flush 参数用于清除给定数量的元组后的输出。format 参数指定了输出文件的格式。 [size=1.166em]组合运算符是一个运算符集合。它表示对原始(非组合)运算符或组合(嵌套)运算符的一个子图的一种封装。它类似于过程语言中的宏。 [size=1.166em]一个应用程序由一个没有输入或输出端口的主要组合运算符表示。数据可流入和流出,但不会流到一个图表内的流上,而且流可导出到在同一个实例中运行的其他应用程序和从这些应用程序导入。清单 2 中的代码给出了主要组合运算符的框架。 清单 2. 主要组合运算符的结构composite Main { graph stream ... { } stream ... { } ...}
[size=1.166em]作为一个示例,大家来看一个简单的流应用程序 WordCount,它统计一个文件中的行数和字数。该程序由以下流图组成:
- 一个 Source 预算法调用,读取一个文件并将各行发送给数据流。
- 一个 Functor 运算符调用,统计行数和每个数据行的字数,将统计数据发送给它的输出流。
- 一个 Counter 运算符调用,聚合文件中所有行的统计数据并打印在末尾。
[size=1.166em]在先容 WordCount 的主要组合运算符之前,我将定义一些帮助器。我将为一行的统计数据使用 LineStat 类型。此外,我需要构建一个countWords(rstring line) 函数来统计一行中的字数,需要使用一个 addM(mutable LineStat x, LineStat y) 函数来添加两个LineStat 值并存储结果。清单 3 定义了这些帮助器。 清单 3. WordCount 帮助器定义type LineStat = tuple<int32 lines, int32 words>; int32 countWords(rstring line) { return size(tokenize(line, " \t", false)); } void addM(mutable LineStat x, LineStat y) { x.lines += y.lines; x.words += y.words; }
[size=1.166em]现在可以定义主要组合运算符了,如清单 4 所示。 清单 4. WordCount 的主要组合运算符composite WordCount { graph stream<rstring line> Data = FileSource() { param file : getSubmissionTimeValue("file"); format : line; } stream<LineStat> OneLine = Functor(Data) { output OneLine : lines = 1, words = countWords(line); } () as Counter = Custom(OneLine) { logic state : mutable LineStat sum = { lines = 0, words = 0 }; onTuple OneLine : addM(sum, OneLine); onPunct OneLine : if (currentPunct() == Sys.FinalMarker) println(sum); } }
开发环境 [size=1.166em]InfoSphere Streams 提供了一个敏捷开发环境,该环境由 Eclipse IDE、Streams Live Graph 视图和一个流调试器组成。该平台还包含用于加速和简化特定功能或行业的解决方案开发的工具包:
- 标准工具包:包含随产品发布的默认运算符:
- 关系运算符,比如 Filter、Sort、Functor、Join、Punctor 和 Aggregate
- 适配器 运算符,比如 FileSource、FileSink、DirectoryScan 和 Export
- 实用程序运算符,比如 Custom Split、DeDuplicate、Throttle、Union、Delay、ThreadedSplit、Barrier 和DynamicFilter
- 互联网工具包:包括 HTTP、FTP、HTTPS、FTPS 和 RSS 等运算符。
- 数据库工具包:支撑 DBMS,包括 DB2®、Netezza、Oracle Database、SQL Server 和 MySQL。
- 其他内置工具包:金融、数据挖掘、大数据和文本工具包。
此外,您可定义您自己的工具包,提供可重用的运算符和函数集,并创建跨领域和特定于领域的加速器。它们可包含原始和组合运算符,也可同时使用原生和 SPL 函数。
|