Kettle使用文档.docx
- 文档编号:12395655
- 上传时间:2023-06-05
- 格式:DOCX
- 页数:44
- 大小:3.42MB
Kettle使用文档.docx
《Kettle使用文档.docx》由会员分享,可在线阅读,更多相关《Kettle使用文档.docx(44页珍藏版)》请在冰点文库上搜索。
Kettle使用文档
1.Kettle简介
Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。
Kettle中文名称叫水壶,该项目的主程序员MATT希望把各种数据放到一个壶里,然后以一种指定的格式流出。
Kettle主要由资源库、数据库、作业(job)、转换、步骤五部分组成,资源库是用来存储用户所编写的作业和转换(即kjb文件和ktr文件)一般是在数据库中存储,方便用户的查找和使用。
数据库就是处理数据是需要的数据库。
作业是用来确定一个工程中需要使用的转换和转换的执行顺序。
转换是数据在ktr文件中的具体转换过程,类似于Java的一个方法,而作业就类似于java的一个类,它可以调用各种不同的方法(转换)。
2.Kettle框架搭建
(1)下载安装
可以从http:
//kettle.pentaho.org下载最新版的Kettle软件,同时,Kettle是绿色软件,下载后,解压到任意目录即可。
(2)配置环境变量
使用Kettle前提是配置好Java的环境变量,因为Kettle是java编写,需要本地的JVM的运行环境。
配置Java环境变量可参考:
配置Kettle环境变量步骤:
一、在系统的环境变量中添加KETTLE_HOME变量,目录指向kettle的安装目录:
D:
kettledata-integration(具体以安装路径为准)
二、新建系统变量:
KETTLE_HOME
变量值:
D:
kettledata-integration(具体以安装路径为准,Kettle的解压路径,直到Kettle.exe所在目录)
三、选择PATH添加环境变量:
变量名:
PATH
变量值:
%KETTLE_HOME%;
(3)Kettle工具的运行
在Windows系统下运行,只需要解压kettle文件后,双击data-integration文件夹中的Spoon.bat文件
在Linux下运行则双击data-integration文件夹中的Spoon.sh文件
3.Kettle的基本概念
(1)作业(job)
负责将【转换】组织在一起进而完成某一块工作,通常我们需要把一个大的任务分解成几个逻辑上隔离的作业,当这几个作业都完成了,也就说明这项任务完成了。
1.JobEntry:
一个JobEntry是一个任务的一部分,它执行某些内容。
2.Hop:
一个Hop代表两个步骤之间的一个或者多个数据流。
一个Hop总是代表着两个JobEntry之间的连接,并且能够被原始的JobEntry设置,无条件的执行下一个JobEntry,
直到执行成功或者失败。
3.Note:
一个Note是一个任务附加的文本注释信息。
1)创建作业
2)START控件
核心对象-通用-START中选取
START是一个job的任务入口,只有无条件的任务可以从此入口开始任务,也可以在此设置任务定时运行和重复运行。
3)成功控件
核心对象-通用-成功中选取
代表job的结束,没有任何数据的操作
4)转换
核心对象-通用-转换中选取
是job中具体的数据转换操作,只要把之前写好的ktr文件路径设置上去,就可以运行对应的ktr文件
(2)转换(Transformation)
定义对数据操作的容器,数据操作就是数据从输入到输出的一个过程,可以理解为比作业粒度更小一级的容器,我们将任务分解成作业,然后需要将作业分解成一个或多个转换,每个转换只完成一部分工作。
1.Value:
Value是行的一部分,并且是包含以下类型的的数据:
Strings、floatingpointNumbers、unlimitedprecisionBigNumbers、Integers、Dates、或者Boolean。
2.Row:
一行包含0个或者多个Values。
3.OutputStream:
一个OutputStream是离开一个步骤时的行的堆栈。
4.InputStream:
一个InputStream是进入一个步骤时的行的堆栈。
5.Step:
转换的一个步骤,可以是一个Stream或是其他元素。
6.Hop:
一个Hop代表两个步骤之间的一个或者多个数据流。
一个Hop总是代表着一个步骤的输出流和一个步骤的输入流。
7.Note:
一个Note是一个转换附加的文本注释信息。
1)创建转换
创建简单的转换,首先配置数据库连接,一个转换中可以创建多个数据库连接,而且是不同类型的数据库也都可以,方便从一个数据库中抽取数据到另一个数据库中
2)表输入(数据来源)
从核心组件树菜单中拖拽一个表输入组件,用于编写sql语句
3)表输出(数据接收)
从核心组件树菜单中拖拽一个表输出组件,用于将上一步骤传递的数据保存到数据库中
4)获取系统信息
双击后弹出页面如下图:
点击类型弹出下图框如下图所示:
点击选择所需要的信息即可
5)字段选择
双击打开控件:
选择和修改:
制定需要流向输出流的字段的精度顺序和名称。
获取选择的字段:
默认获取输入流中的字段。
列映射:
指的是字段名称==》字段该名成之后的映射,如果选择了移除或者元数据,则不能生成映射
移除:
指定从输出流删除的字段。
获取移除的字段:
默认获取所有的输入流字段
元数据:
修改元数据字段的名称、类型、长度、精度等。
6)合并记录
功能描述:
合并两个数据流,并根据某个关键字排序。
这两个数据被比较,以标识相等的、变更的、删除的和新建的记录。
这个步骤允许你比较两个行流,如果你想在两个不同的时间比较数据,这个是比较有用的。
两个行流被合并,一个旧数据源,一个是新数据源
双击【合并记录】控件,根据表示字段flagfield的值判断相等、变更、删除的和新建的记录。
1.“identical”–旧数据和新数据一样
2.“changed”–数据发生了变化;
3.“new”–新数据中有而旧数据中没有的记录
4.“deleted”–旧数据中有而新数据中没有的记录
关键字段:
用于定位两个数据源中的同一条记录。
比较字段:
对于两个数据源中的同一条记录中,指定需要比较的字段。
合并后的数据将包括旧数据来源和新数据来源里的所有数据,对于变化的数据,使用新数据代替旧数据,同时在结果里用一个标示字段,来指定新旧数据的比较结果。
注意:
旧数据和新数据需要事先按照关键字段排序。
旧数据和新数据要有相同的字段名称。
例子:
旧数据:
新数据合并后的数据
field1,field2field1,field2field1;field2;flag
1,11,11;1;identical
2,22,92;9;changed
3,35,53;3;deleted
4,44;4;deleted
5;5;new
4.Kettle使用
打开kettle工具后会出现如下界面:
(1)新建作业
路径:
文件–>新建作业。
(2)新建2个连接(mysql、oracle)
路径:
主对象树-作业-作业1-DB连接。
连接名称:
数据库连接名称(随意填写,不能为空)
连接:
连接的数据库类型
连接方式:
数据库的连接方式(本文档只选择了Native的连接方式)
主机名称:
数据库具体的Ip地址
数据库名称:
指定连接的数据库名称
端口号:
数据库监听的tcp/ip端口号
用户名:
指定连接数据库登录时的用户名
密码:
指定连接数据库登录时的密码
填写完数据库连接的基本信息后,点解界面中的测试按钮进行数据库连接测试,如成功连接会弹出如下信息,反之则会弹出出错信息,出错后根据相关的出错信息进行修改。
Oracle数据库同理:
注:
需要把数据库连接需要的jar包添加到lib文件夹下。
(1)Kettle实现数据库整库迁移
一、采用软件自带方式
首先在目标数据库中先创建一个抽取记录表TIME_TABLE
表内2个字段为:
etl_time(抽取时间)
table_name(抽取表名)
1.新建一个job
创建两个DB连接:
mysql、ORCL(源数据库和目标数据库连接),在菜单中找到【复制多表向导】,点击进行相关操作:
2.选择源数据库和目标数据库
3.选择所需迁移的表
4.编辑生成的job文件名:
qy.kjb,和文件目录,编辑好后+【Finish】
点击Finish之后界面如下图所示,
5.把当前抽取时间和表名记录到创建的TIME_TABLE表中,通过SQL脚本,核心对象-脚本-SQL,左键拖拽进主页面通过Shift将其连接起来
在这个SQL脚本中填写insert语句,把当前时间和表名添加到数据库抽取记录表中
6.点击运行按钮
二、自定义数据库迁移(oracle>>mysql)
1.创建一个主job:
数据库迁移.kjb
2.创建转换:
获取表名称.ktr
1)创建一个DB连接:
oraclesource(源数据库连接)
2) 表输入
注:
oracle中获取表名SQL:
select*fromuser_tables,mysql中为:
showtables
3)字段选择
点击【获取选择的字段】,添加新的字段,选中"TABLE_NAME"并改名成“tablename”,将其余字段删除
4)添加一个【复制记录到结果字符串】
3.新建一个job:
抽取表.kjb
1)新建一个DB连接:
mysqltarget
4.新建一个转换:
表名称变量设置.ktr
1)添加一个【从结果获取记录】
2)添加一个【设置变量】
5.添加一个【检查表是否存在】
6.新建一个转换:
创建目标库表结构.ktr
1)新建两个DB连接:
oraclesource,mysqltarget创建过程同上
2)添加一个【表输入】
3)添加一个【Java代码】,代码如下:
1.
publicbooleanprocessRow(StepMetaInterfacesmi,StepDataInterfacesdi)throwsKettleException
2.
3.
{
4.
5.
Object[]r=getRow();
6.
7.
8.
//本地连接获取数据库元数据
9.
10.
//org.pentaho.di.core.database.DatabaseMetadbmeta=getTransMeta().findDatabase("target");
11.
12.
13.
//资源库连接获取数据库元数据
14.
15.
org.pentaho.di.core.database.DatabaseMetadbmeta=null;
16.
17.
18.
java.util.Listlist=getTrans().getRepository().readDatabases();
19.
20.
21.
if(list!
=null&&!
list.isEmpty())
22.
23.
{
24.
25.
for(inti=0;i 26. 27. { 28. 29. dbmeta=(org.pentaho.di.core.database.DatabaseMeta)list.get(i); 30. 31. if("target".equalsIgnoreCase(dbmeta.getName())) 32. 33. { 34. 35. break; 36. 37. } 38. 39. } 40. 41. } 42. 43. 44. if(dbmeta! =null) 45. 46. { 47. 48. org.pentaho.di.core.database.Databasedb=neworg.pentaho.di.core.database.Database(dbmeta); 49. 50. 51. try 52. 53. { 54. 55. db.connect(); 56. 57. 58. Stringtablename=getVariable("TABLENAME"); 59. 60. 61. //在日志中显示创建表名 62. 63. logBasic("开始创建表: "+tablename); 64. 65. 66. if(tablename! =null&&tablename.trim().length()>0) 67. 68. { 69. 70. 71. //获取表名称和输入行数据 72. 73. Stringsql=db.getDDLCreationTable(tablename,getInputRowMeta()); 74. 75. 76. db.execStatement(sql.replace(";","")); 77. 78. 79. //在日志中显示sql语句 80. 81. logBasic(sql); 82. 83. } 84. 85. } 86. 87. catch(Exceptione) 88. 89. { 90. 91. logError("创建表出现异常",e); 92. 93. 94. }finally{ 95. 96. db.disconnect(); 97. 98. } 99. 100. } 101. 102. returnfalse; 103. 104. } 105. 7.新建一个转换: 表抽取.ktr 1)新建两个DB连接: oraclesource,mysqltarget创建过程同上 2)添加一个【表输入】 3)添加一个表输出 8.运行数据库迁移.kjb (2)Kettle增量抽取方案 新建一个转换 在核心对象中找到【表输入】【表输入2】【表输出】【阻塞数据】 【执行SQL脚本】组件,并通过Shift连接在一起 【表输入】: 查询该表的最新的一次抽取时间 【表输入2】: 把上一步查询出来的最新抽取时间当做查询条件,查出要抽取的数据 【表输出】把查询出来要抽取的数据,抽取到oracle数据库中 【执行SQL语句】: 修改成目标数据库抽取过来的时间最大的一条记录,把数据抽取记录表的时间修改以便下一次抽取 表输入: 旧数据来源的步骤(来源于mysql数据库) 表输入2: 新数据来源的步骤(来源于oracle数据库) 合并记录: 将输入源与目的源的每个字段数据根据唯一字段比较后到值映射 值映射: 使用字段名为起的后面用到的一个变量名(可任意起),源值列为系统默认 1.“identical”–旧数据和新数据一样 2.“changed”–数据发生了变化; 3.“new”–新数据中有而旧数据中没有的记录 4.“deleted”–旧数据中有而新数据中没有的记录 过滤记录步骤: 通过flagfield的值来执行后面的分支,也可以通过Switch/Case组件执行后面的分支例如: 5.Kettle定时任务 新建一个【作业】,【核心对象】【通用】里面把【START】【转换】【成功】组件拖拽出来并通过Shift连接 【START】组件: 设置重复操作,时间间隔2秒 【转换】组件: 浏览,选择已保存的ktr文件 【成功】: 提示操作成功。 判断数据数据是否变化:
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- Kettle 使用 文档
![提示](https://static.bingdoc.com/images/bang_tan.gif)