Greenplum MapReduce规范
Greenplum MapReduce规范
MapReduce是谷歌开发的一个编程模型,用于在普通服务器阵列上处理和生成大量的数据集。Greenplum MapReduc允许熟悉MapReduce的程序员写map和reduce函数同时提交它们到Greenplum数据库并行引擎上用于处理。
为了能让Greenplum处理MapReduce函数,需要在一个文档中定义函数,然后将该文档传递给Greenplum的MapReduce程序,gpmapreduce,由Greenplum数据库并行引擎执行。Greenplum数据库系统会将输入的数据进行分布、在一系列的机器上执行程序、处理机器故障以及管理所需的内部机器间的通信。
关于 gpmapreduce的信息请见Greenplum数据库工具指南。
Greenplum MapReduce文档格式
这部分解释一些Greenplum MapReduce文档格式的基本知识来帮助用户开始创建自己的Greenplum MapReduce文档。Greenplum使用YAML 1.1文档格式同时对于定义一个MapReduce作业的各个步骤有自己的模式实现。
所有Greenplum MapReduce文件必须首先定义它们使用的YAML规范的版本。在此之后,三个破折号(---)表示一个文档的是开始以及三个点号 (...) 指明一个不需要启动新文档的文档的结束。只是行以一个英镑符号(#)为前缀。也可以在相同的文件中声明多个Greenplum MapReduce文档:
%YAML 1.1 --- # Begin Document 1 # ... --- # Begin Document 2 # ...
在一个Greenplum MapReduce 文档中,有三种数据结构或者节点的基本类型:标量、序列 以及映射。
标量是一个用空格缩进的文本串。如果有一个跨越多行的标量的输入,一个前置竖线(|)会表示一种literal样式,在其中所有的换行都是有意义的。或者,用前置的前括号(>)为后续具有相同缩进级别的行把单个换行符折叠为空格。如果一个字符串包含具有保留意义的字符,字符串必须被引用或者特殊字符必须使用反斜线(\)转义。
# 逐字地读取每行 somekey: | this value contains two lines and each line is read literally # 将每个新的行看做一个空格 anotherkey: > this value contains two lines but is treated as one continuous line # 该字符串引用包含了一个特殊字符 ThirdKey: "This is a string: not a mapping"
序列是一个列表,列表中每个项在它们自己的行中以一个破折号和一个空格(- )来指示。或者,用户可以指定一个内联序列作为一个方括号内部逗号分隔的列表。一个序列提供了一个数据集合同时给定它们之间的顺序。当用户装载一个列表到Greenplum MapReduce程序中时,该顺序会被保留。
# 列表序列 - this - is - a list - with - five scalar values # 内联序列 [this, is, a list, with, five scalar values]
映射被用来打包数据的值和被称为keys的标识符。映射为每个键: 值对使用一个冒号和空格(: ),或者也可以被指定内联为一个花括号内由逗号分隔的列表。键用来作为从映射中获取数据的索引。
# 多个项目的一个映射 title: War and Peace author: Leo Tolstoy date: 1865 # 用内联表示该映射 {title: War and Peace, author: Leo Tolstoy, date: 1865}
键被用来连接每个节点和元数据信息以及指定期望的节点类型(scalar、sequence或者mapping)。见Greenplum MapReduce文档模式获取关于Greenplum MapReduce程序期待的键。
Greenplum MapReduce 程序安好走啊顺序处理一个文档的节点同时使用缩进(空格)来决定文档的层级和节点见得关系。空格的使用非常重要。不应该简单的使用空格作为格式化的目的,制表符不应该被使用。
Greenplum MapReduce文档模式
Greenplum MapReduce使用YAML文档架构同时实现了自己的YAML模式。一个Greenplum MapReduce文档的基本结构为:
%YAML 1.1 --- VERSION: 1.0.0.2 DATABASE: dbname USER: db_username HOST: master_hostname PORT: master_port
DEFINE: - INPUT: NAME: input_name FILE: - hostname:/path/to/file GPFDIST: - hostname:port/file_pattern TABLE: table_name QUERY: SELECT_statement EXEC: command_string COLUMNS: - field_name data_type FORMAT: TEXT | CSV DELIMITER: delimiter_character ESCAPE: escape_character NULL: null_string QUOTE: csv_quote_character ERROR_LIMIT: integer ENCODING: database_encoding
- OUTPUT: NAME: output_name FILE: file_path_on_client TABLE: table_name KEYS: - column_name MODE: REPLACE | APPEND
- MAP: NAME: function_name FUNCTION: function_definition LANGUAGE: perl | python | c LIBRARY: /path/filename.so PARAMETERS: - nametype RETURNS: - nametype OPTIMIZE: STRICT IMMUTABLE MODE: SINGLE | MULTI
- TRANSITION | CONSOLIDATE | FINALIZE: NAME: function_name FUNCTION: function_definition LANGUAGE: perl | python | c LIBRARY: /path/filename.so PARAMETERS: - nametype RETURNS: - nametype OPTIMIZE: STRICT IMMUTABLE MODE: SINGLE | MULTI
- REDUCE: NAME: reduce_job_name TRANSITION: transition_function_name CONSOLIDATE: consolidate_function_name FINALIZE: finalize_function_name INITIALIZE: value KEYS: - key_name
- TASK: NAME: task_name SOURCE: input_name MAP: map_function_name REDUCE: reduce_function_name EXECUTE
- RUN: SOURCE: input_or_task_name TARGET: output_name MAP: map_function_name REDUCE: reduce_function_name...
- VERSION
-
要求。Greenplum MapReduce YAML规范的版本。当前版本为1.0.0.1.
- DATABASE
- 可选。指定要连接到Greenplum中哪个数据库。如果没有指定,默认连接到默认的数据库或者$PGDATABASE(如果该值被设置了)。
- USER
- 可选。指定使用哪个数据库角色来连接。如果没有指定,默认使用当前用户,或者$PGUSER(如果该值被设置)。用户必须为Greenplum的超级用户,如果要运行的函数是由不可行的Python或者Perl写的。普通数据库用户能运行由可信Perl写的函数。当用户运行的MapReduce作业包含了FILE、GPFDIST以及EXEC输入类型,那么用户也必须是数据库的超级用户。
- HOST
- 可选。指定Greenplum的Master主机的名称。如果没有指定,默认为localhost 或者$PGHOST(如果该值被设置了)。
- PORT
- 可选。指定Greenplum的Master节点的端口。如果没有指定,默认端口为5432或者$PGPORT(如果该值被设置)。
- DEFINE
- 要求。为该MapReduce文档的定义的一个序列。DEFINE部分必须至少有一个INPUT定义。
- INPUT
- 要求。定义输入数据。每个MapReduce文档必须至少有一个输入数据定义。在一个文档中允许有多个输入定义,但是每个输入定义只能指定一个访问类型:一个文件、一个gpfdist文件分布程序、一个数据库中的表、一个SQL命令或者一个操作系统的命令。见 Greenplum数据库工具指南获取关于 gpfdist的信息。
- NAME
-
一个该输入的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
- FILE
- 一个或者多个输入文件的序列,以格式: seghostname:/path/to/filename。用户必须是一个Greenplum数据库的超级用户来运行带有FILE输入的MapReduce作业。文件必须驻留在Greenplum的segment主机上。
- GPFDIST
- 一个或者多个运行的gpfdist文件分发程序的序列,以格式: hostname[:port]/file_pattern。用户必须为Greenplum数据库超级用户运行以GPFDIST作为输入的MapReduce作业,除非服务配置参数服务器配置参数被设置为on。
- TABLE
- 数据库中存在的表的名称
- QUERY
- 一个运行在数据库内的SQL命令SELECT。
- EXEC
- 一个运行在Greenplum的Segment主机上的操作系统命令。该命令默认由系统中所有的Segment实例运行。例如,如果在每个Segment主机上有四个Segment实例,那么该命令会在每个主机上执行四次。必须作为Greenplum数据库的超级用户才能执行带有EXEC输入的MapReduce作业,同时服务配置参数服务器配置参数要设置为on。
- COLUMNS
- 可选。列被指定为: column_name [data_type]。如果没有指定,默认为value text。 The DELIMITER字符用来分隔两个数据域(列)。一个行由一个(a line feed character)行的字符(0x0a)决定。
- FORMAT
-
可选。指定数据的格式 - 或者为分隔的文本(TEXT)或者逗号分隔的值(CSV的格式。如果数据格式没有被指定,默认为TEXT。
- DELIMITER
-
对于FILE、GPFDIST以及EXEC输入是可选的。指定单个字符来分隔数据的值。默认为一个tab字符在TEXT中。分隔字符必须出现在两个数据值的域之间。不用讲分隔符放在一个行的开始或者结束。
- ESCAPE
- 对于FILE、 GPFDIST以及EXEC输入是可选的。指定被用来作为C的转义序列(例如,\n、\t、\100等)的单个字符同时对于转义字符可能被当作行或者列的分隔符。 确保用户选择的转义字符没有在用户实际的列的数据库中使用。对于文本格式文件的默认的转义字符为一个\(反斜线符号),对于csv格式的文件的默认转义字符为一个"(双引号),然后也有可能指定其它的字符来代表转义。也可能通过指定'OFF'为转义值来关闭转义。这对于像文本格式的内嵌有反斜线(此处反斜线的目的不是转义)的网络日志数据是非常有用的。
- NULL
- 对于FILE、GPFDIST以及EXEC输入是可选的。指定代用空值的字符串。在TEXT格式下,默认为\N在CSV格式下,默认为没有引用(quotations)的空值。 甚至在TEXT模式下,用户可能更喜欢一个空字符串,用户不想从空字符串中区别空值。任何能够匹配该字符串输入数据项将被视为空值。
- QUOTE
- 对于FILE、GPFDIST和EXEC输入是可选的。为CSV格式文件指定引用字符的。默认是双引号(")。在CSV格式的文件中,如果它们包含有逗号或者内嵌新的行,则数据值域必须包含在双引号中。包含双引号字符的字段必须用双引号保卫,并且内嵌的双引号必须有一对连续的双引号表示。为了数据行的正确解析,总是打开和关闭引号是非常重要的。
- ERROR_LIMIT
- 在Greenplum的任何Segment实例进行输入处理期间,如果输入行有格式错误,没有达到提供的错误限制计数前,错误都会被丢弃。如果错误限制没有达到,所有好的行将会被处理同时任何错误的行会被丢弃。
- ENCODING
- 用于数据的字符集编码。指定一个字符串常量(例如,'SQL_ASCII')、一个整型编码数字、或者DEFAULT来用于默认的客户端编码。见字符集支持获取更多信息。
- OUTPUT
- 可选。定义该MapReduce作业在哪里输出格式数据。如果输出没有定义,默认为STDOUT(客户端的标准输出)。用户可以发送输出到一个客户端主机上的文件中或者数据库当前存在的一张表中。
- NAME
- 该输出的一个名称。默认名为STDOUT。 一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
- FILE
- 指定在MapReduce客户端机器上一个文件的位置,用于输出数据,格式为:/path/to/filename。
- TABLE
- 在数据库中指定一个表的名称用于输出数据。如果该表在执行MapReduce作业前不存在,那么将通过指定 KEYS的分布策略来创建。
- KEYS
- 对于 TABLE输出是可选的。指定列用于Greenplum数据库的分布键。如果EXECUTE任务包含了一个REDUCE定义,那么该键默认将作为表的分布键。否则表的第一列将用作分布键。
- MODE
- 对于 TABLE输出是可选的。如果没有指定,如果表不存在默认行为为创建一个表,如果存在则发出一个错误。声明APPEND添加数据到一张已经存在的表中(被提供的表模式匹配输出格式)而不需要移除任何已经存在的数据。声明 REPLACE,如果表存在将删除表然后重新创建该表。 APPEND和 REPLACE都会在表不存在的时候创建新表。
- MAP
- 要求。每个MAP函数都接受在(key,value)对中构造数据,处理每对同时生成零个或者更多输出的(key,value)对。Greenplum MapReduce 架构然后收集从所有输出列表中收集具有相同键的对并将他们归类到一起。然后将该输出传递给REDUCE任务,该任务由TRANSITION | CONSOLIDATE | FINALIZE函数组成。有一个预定义的命名为IDENTITY的MAP函数,该函数返回没有变化的(key,value)对。尽管(key,value)是默认的参数,用户也能根据需要指定原型。
- TRANSITION | CONSOLIDATE | FINALIZE
- TRANSITION、CONSOLIDATE以及 FINALIZE是所有组成REDUCE的片段(pieces)。一个TRANSITION函数是必须的。 CONSOLIDATE 以及 FINALIZE 函数是可选的。默认情况下,所有都将state作为它们的输入PARAMETERS的第一个,但是其它原型也可以这样定义。
-
一个TRANSITION 函数遍历给定键的每个值同时在一个state变量中累积值。当一个过渡(transition)函数在一个键的第一个值被调用时,state 的值被设置为通过一个a REDUCE作业的INITALIZE指定的值(或者使用该数据类型默认的state值)。一个过渡函数用两个参数作为输入,当前的键归约的state,之后产生一个新state的下一个值。
- 如果一个CONSOLIDATE函数被指定,在通过Greenplum的Interconnect为最终聚集(两阶段聚集)重新分布键之前,TRANSITION处理会在Segment级别被执行。只有给定键的结果state被重分布时,才会导致较低的Interconnect流量以及较高的并行性。CONSOLIDATE会像TRANSITION一样被处理,不过它不会用(state + value) => state,而是采用(state + state) => state。
- 如果一个FINALIZE函数被指定,它接受由CONSOLIDATE(如何呈现的话)或者
TRANSITION函数产生的最终state同时执行任何在发出最终结果前的所有处理。 TRANSITION和CONSOLIDATE
函数不能返回值的集合。如果用户需要一个REDUCE作业返回一个集合,那么一个FINALIZE是有必要的,它能将最后的state转换为一个输出值的集合。
- NAME
- 要求。一个函数的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。用户也能指定一个Greenplum数据库内建函数的名称。如果使用内建函数,不需提供LANGUAGE或者一个FUNCTION体。
- FUNCTION
- 可选。通过使用指定的LANGUAGE指定函数的完整的主体。如果 FUNCTION没有指定,那么一个对应NAME的内建数据库函数将会被使用。
- LANGUAGE
- 当 FUNCTION使用时,是要求的。 指定用来解释函数的实现语言。该版本的语言支持有perl、 python以及C。如果调用一个内建数据库函数,LANGUAGE不应该被指定。
- LIBRARY
- 当LANGUAGE是C(对于其它语言是不允许的)时,是要求的。为了使用该属性, VERSION必须为1.0.0.2。指定的库文件一定在执行MapReduce作业前安装好,同时在所有的Greenplum主机(master和segment)上都位于相同的文件系统位置。
- PARAMETERS
- 可选。函数输入参数。默认类型为text。
MAP 默认 - key text, value text
TRANSITION 默认 - state text, value text
CONSOLIDATE 默认 - state1 text, state2 text (必须为恰好相同数据类型的两个输入参数)
FINALIZE 默认 - state text (只有一个参数)
- RETURNS
- 可选。默认的返回类型为text。
MAP 默认 - key text, value text
TRANSITION 默认 - state text (只有一个返回值)
CONSOLIDATE 默认 - state text (只有一个返回值)
FINALIZE 默认 - value text
- OPTIMIZE
- 函数可选的优化参数:
STRICT - 函数不受NULL值的影响。
IMMUTABLE - 对于一个给定的输入函数总是返回相同的值。
- MODE
- 可选。指定函数返回的行的行数。
MULTI - 内个输入记录返回0或者更多的行。函数的返回值一定是一个返回的行数组,或者函数在Python中使用yield写成迭代器或者在Perl中用return_next。MULTI是MAP和FINALIZE函数的默认模式。
SINGLE - 每个输入记录恰好只返回一行。 SINGLE是为一个支持 TRANSITION和CONSOLIDATE函数的模式。当使用MAP和FINALIZE函数时,SINGLE 模式能够适度的性能提升。
- REDUCE
- 要求。一个REDUCE定义命名TRANSITION | CONSOLIDATE | FINALIZE函数组成(key,
value)归约到最终的结果集。有几个用户能执行的预先定义REDUCE作业,它们所有操作在一个名为value的列上:
IDENTITY - 返回没有改变的(key, value) 对
SUM - 计算数值数据的和
AVG - 计算数值数据的平均值
COUNT - 计算输入数据的计数
MIN - 计算数值数据的最小数值
MAX - 计算数值数据的最大值
- NAME
- 要求。该REDUCE作业的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
- TRANSITION
- 要求。 TRANSITION 函数的名称。
- CONSOLIDATE
- 可选。CONSOLIDATE 函数的名称。
- FINALIZE
- 可选。 FINALIZE 函数的名称。
- INITIALIZE
- 对于text 和float数据类型是可选的。对于其它的数据类型是要求的。text的默认值为''。float的默认值为0.0。TRANSITION函数的初始state值为集合。 Sets the initial state value of the TRANSITION function.
- KEYS
- 可选。默认为[key, *]。当使用多个列进行归约,有必要指定哪些列为键(key)列以及哪些列为值(value)列。 默认的,任何不传递给TRANSITION函数的列为键列,以及一个名为key的列总是键列,即使它被传递给了TRANSITION函数。专门的指示符*表明所有的列都不会传递给TRANSITION函数。如果该指示符在键的列表中没有呈现,那么所有没有匹配的列都将会被丢弃。
- TASK
- 可选。一个TASK定义了一个在Greenplum MapReduce作业流水线内完整的端到端的 INPUT/MAP/REDUCE 阶段。和 EXECUTE 很相似,处理它不是立刻执行。一个任务对象能够作为INPUT调用一直到更进一步的处理阶段。
- EXECUTE
- 要求。EXECUTE定义在Greenplum MapReduce作业流水线中最后的 INPUT/MAP/REDUCE阶段。
关于Greenplum MapReduce文档的示例
# 该MapReduce作业示例处理文档同时查找它们中的关键词。 # 它接受两个数据库的表作为输入: # - 文档 (doc_id integer, url text, data text) # - 关键词 (keyword_id integer, keyword text)# # 在文档数据中搜素关键词的出现,返回url、data、keyword的结果(一个keyword可以为多个单词,例如"high performance # computing") %YAML 1.1 --- VERSION:1.0.0.1 # 使用该数据库和角色连接到Greenplum数据库 DATABASE:webdata USER:jsmith # 开始定义部分 DEFINE: # 声明输入,从‘documents’和‘keyword’表中选择所有的列和行。 - INPUT: NAME:doc TABLE:documents - INPUT: NAME:kw TABLE:keywords # 定义映射函数从documents和keyword中提取术语 # 该示例简单的利用空格进行分割,但是这里可以 # 利用像python的nltk(the natural language toolkit)库 # 来执行更复杂的单词标记和提取词干。 - MAP: NAME:doc_map LANGUAGE:python FUNCTION:| i = 0 # 文档中一个单词的索引 terms = {}#文档中术语和他们索引的一个hash # 变成小写形式同时用空格分割字符串 for term in data.lower().split(): i = i + 1# 增加i(索引) # 检查在术语列表中的术语Check for the term in the terms list: # 如果主干词语已经存在,添加i的值到数组的入口 # 对应术语。这里考虑一个词语的多次出现。 # 如果主干词语不存在,添加它到词典的位置i处。 # 例如: # data: "a computer is a machine that manipulates data" # "a" [1, 4] # "computer" [2] # "machine" [3] # … if term in terms: terms[term] += ','+str(i) else: terms[term] = str(i) # 每个文档返回多个行。每行由 # doc_id, term 以及 出现term的数据的位置 # For example: # (doc_id => 100, term => "a", [1,4] # (doc_id => 100, term => "computer", [2] # … for term in terms: yield([doc_id, term, terms[term]]) OPTIMIZE:STRICT IMMUTABLE PARAMETERS: - doc_id integer - data text RETURNS: - doc_id integer - term text - positions text # 关键词的关于文档的映射函数差不多为一个The map function for keywords is almost identical to the one for documents # 但是它也计算了关键词中术语的数目。but it also counts of the number of terms in the keyword. - MAP: NAME:kw_map LANGUAGE:python FUNCTION:| i = 0 terms = {} for term in keyword.lower().split(): i = i + 1 if term in terms: terms[term] += ','+str(i) else: terms[term] = str(i) # 输出四个值包括i(术语出现的总次数):output 4 values including i (the total count for term in terms): yield([keyword_id, i, term, terms[term]]) OPTIMIZE:STRICT IMMUTABLE PARAMETERS: - keyword_id integer - keyword text RETURNS: - keyword_id integer - nterms integer - term text - positions text # 一个任务是一个定义了在Greenplum MapReduce流水线上整个INPUT/MAP/REDUCE的阶段。 # 这很像是一个EXECUTION,但是只有在一输入到其他处理阶段被调用时,才会执行。 # 识别一个称为'doc_prep'的任务,该任务接受先前定义的 'doc' INPUT # 同时执行‘doc_map’ MAP函数返回doc_id, term, [term_position] - TASK: NAME:doc_prep SOURCE:doc MAP:doc_map # 识别一个称为'kw_prep'的任务,该任务接受 先前定义的'kw' INPUT # 同时执行kw_map MAP 函数返回kw_id, term, [term_position] - TASK: NAME:kw_prep SOURCE:kw MAP:kw_map # Greenplum MapReduce的一个优势是MapReduce任务可以作为 # SQL操作的输入,同时SQL也能用来执行一个MapReduce任务。 # 该INPUT定义了一个SQL查询,此查询将'doc_prep' TASK的输出同 # 'kw_prep' TASK的进行连接。匹配项是'candidate'列表的输出 Matching terms are output to the 'candidate' # (任何一个关键词至少共享同文档共享一个term)。any keyword that shares at least one term with the document). - INPUT: NAME: term_join QUERY: | SELECT doc.doc_id, kw.keyword_id, kw.term, kw.nterms, doc.positions as doc_positions, kw.positions as kw_positions FROM doc_prep doc INNER JOIN kw_prep kw ON (doc.term = kw.term) # 在Greenplum的MapReduce中,一个REDUCE函数有一个或者多个函数组成。 # 一个REDUCE为每个分组键定义有一个初始的‘state’变量。 that is # 这是一个用于调节每个键分组状态的过渡函数。 # 如果呈现,一个可选的CONSOLIDATE函数结合了多个 # 'state' 变量。这允许TRANSITION函数能够在segment级别执行在本地,同时值在整个网络中 # 重分布积累的‘state’。如果呈现,一个可选FINALIZE函数被使用来执行在一个状态上最后的计算 # 同时发射从状态来的一个或者多个输出行。 # # 该REDUCE函数被称为‘term_reducer’带有一个被称为'term_transition'的TRANSITION函数 #以及一个被称为'term_finalizer'的FINALIZE函数。 - REDUCE: NAME:term_reducer TRANSITION:term_transition FINALIZE:term_finalizer - TRANSITION: NAME:term_transition LANGUAGE:python PARAMETERS: - state text - term text - nterms integer - doc_positions text - kw_positions text FUNCTION: | # 'state' 有一个''的初始值以及使用冒号分隔的关键词位置的集合。 # 关键词位置是用逗号分隔的整数集。例如,'1,3,2:4:' # 如果这里存在一个状态,分割它到关键词位置的集合,否则, # 构建一个'nterms'关键词位置的集合。所有的空 if state: kw_split = state.split(':') else: kw_split = [] for i in range(0,nterms): kw_split.append('') # 'kw_positions'是一个整数分隔的逗号域,显示 # 单个在给定关键词出现的位置。 # 基于','分割转换字符串到一个python 列表。 # 为当前term添加doc_positions for kw_p in kw_positions.split(','): kw_split[int(kw_p)-1] = doc_positions # 该部分接受每个在'kw_split'数组中的元素,同时使用':'连接,将它们 # 转换为字符串。 # 例如:对于关键词 "computer software computer hardware", # 与文档数据"in the business of computer software software engineers" 匹配的 # 'kw_split'数组将为 ['5', '6,7', '5', ''] # 同时输出状态将为:5:6,7:5: outstate = kw_split[0] for s in kw_split[1:]: outstate = outstate + ':' + s return outstate - FINALIZE: NAME: term_finalizer LANGUAGE: python RETURNS: - count integer MODE:MULTI FUNCTION:| if not state: return 0 kw_split = state.split(':') # 函数做了下面的事情: # 1) 以':'分割'kw_split' # 例如, 1,5,7:2,8 创建了 '1,5,7' 和 '2,8' # 2) 在'kw_split'中的每个组的位置,以','分割集合, # 从集合0: 1,5,7创建['1','5','7'] # 以及从集合1: 2,8创建 ['2', '8'] # 3)检查空字符串 # 4) 通过减去集合在'kw_split'中的位置来调整分割集 # ['1','5','7'] - 0(从每个元素中) = ['1','5','7'] # ['2', '8'] - 1 (从每个元素中) = ['1', '7'] # 5)以步长为4的来截取数组后的结果数组被分割,他们的重叠值为: # ['1','5','7'].intersect['1', '7'] = [1,7] # 6) 决定分割长度,整个关键词(包括所有的小片)在文档数据中匹配的次数。 previous = None for i in range(0,len(kw_split)): isplit = kw_split[i].split(',') if any(map(lambda(x): x == '', isplit)): return 0 adjusted = set(map(lambda(x): int(x)-i, isplit)) if (previous): previous = adjusted.intersection(previous) else: previous = adjusted # 返回最终的计数 if previous: return len(previous) # 定义'term_match' 任务,该任务之后作为 # 'final_output'查询的一部分执行。它接受之前定义的输入 (INPUT) 'term_join' # 同时使用之前定义的归约函数'term_reducer' - TASK: NAME:term_match SOURCE:term_join REDUCE:term_reducer - INPUT: NAME:final_output QUERY:| SELECT doc.*, kw.*, tm.count FROM documents doc, keywords kw, term_match tm WHERE doc.doc_id = tm.doc_id AND kw.keyword_id = tm.keyword_id AND tm.count > 0 # 执行该MapReduce作业,发送结果到STDOUT EXECUTE: - RUN: SOURCE:final_output TARGET:STDOUT
MapReduce实例的流程图
下面的图显示了在示例中定义的MapReduce作业的流程: