欢迎投稿

今日深度:

Hbase(七)hbase高级编程,

Hbase(七)hbase高级编程,


??

一、Hbase结合mapreduce? ??

? ? ?为什么需要用 mapreduce 去访问 hbase 的数据?
? ? ?——加快分析速度和扩展分析能力
? ? ?Mapreduce 访问 hbase 数据作分析一定是在离线分析的场景下应用

? ? ? ?

? ?? ?1、HbaseToHDFS

? ? ? ? ?从?hbase?中读取数据,分析之后然后写入?hdfs,代码实现:?

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394package?com.ghgj.hbase.hbase2hdfsmr;?import?java.io.IOException;import?java.util.List;?import?org.apache.hadoop.conf.Configuration;import?org.apache.hadoop.fs.FileSystem;import?org.apache.hadoop.fs.Path;import?org.apache.hadoop.hbase.Cell;import?org.apache.hadoop.hbase.HBaseConfiguration;import?org.apache.hadoop.hbase.client.Result;import?org.apache.hadoop.hbase.client.Scan;import?org.apache.hadoop.hbase.io.ImmutableBytesWritable;import?org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import?org.apache.hadoop.hbase.mapreduce.TableMapper;import?org.apache.hadoop.hbase.util.Bytes;import?org.apache.hadoop.io.NullWritable;import?org.apache.hadoop.io.Text;import?org.apache.hadoop.mapreduce.Job;import?org.apache.hadoop.mapreduce.Mapper;import?org.apache.hadoop.mapreduce.Reducer;import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;?/**?* 作用:从hbase中读取user_info这个表的数据,然后写出到hdfs?*/public?class?HBaseToHDFSMR {?????????private?static?final?String ZK_CONNECT =?"hadoop03:2181,hadoop04:2181,hadoop05:2181";?????public?static?void?main(String[] args)?throws?Exception {?????????????????Configuration conf = HBaseConfiguration.create();????????conf.set("hbase.zookeeper.quorum", ZK_CONNECT);????????System.setProperty("HADOOP_USER_NAME",?"hadoop");//????? conf.set("fs.defaultFS", "hdfs://myha01/");?????????????????Job job = Job.getInstance(conf);????????job.setJarByClass(HBaseToHDFSMR.class);?????????????????Scan scan =?new?Scan();????????scan.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("name"));????????/**?????????* TableMapReduceUtil:以util结尾:工具?????????* MapReduceFactory:以factory结尾,它是工厂类,最大作用就是管理对象的生成?????????*/????????TableMapReduceUtil.initTableMapperJob("user_info", scan,????????????????HBaseToHDFSMRMapper.class, Text.class, NullWritable.class, job);????????job.setReducerClass(HBaseToHDFSMRReducer.class);?????????????????job.setOutputKeyClass(Text.class);????????job.setOutputValueClass(NullWritable.class);?????????????????Path outputPath =?new?Path("/hbase2hdfs/output");????????FileSystem fs = FileSystem.get(conf);????????if(fs.exists(outputPath)){????????????fs.delete(outputPath);????????}????????FileOutputFormat.setOutputPath(job, outputPath);?????????????????boolean?waitForCompletion = job.waitForCompletion(true);????????System.exit(waitForCompletion ??0?:?1);????}?????????static?class?HBaseToHDFSMRMapper?extends?TableMapper{????????/**?????????* key:rowkey?????????* value:map方法每执行一次接收到的一个参数,这个参数就是一个Result实例?????????* 这个Result里面存的东西就是rowkey, family, qualifier, value, timestamp?????????*/????????@Override????????protected?void?map(ImmutableBytesWritable key, Result value, Mapper.Context context)?throws?IOException, InterruptedException {????????????String rowkey = Bytes.toString(key.copyBytes());????????????System.out.println(rowkey);????????????List cells = value.listCells();????????????for?(int?i =?0; i < cells.size(); i++) {????????????????Cell cell = cells.get(i);????????????????String rowkey_result = Bytes.toString(cell.getRow()) +?"\t"????????????????????????+ Bytes.toString(cell.getFamily()) +?"\t"????????????????????????+ Bytes.toString(cell.getQualifier()) +?"\t"????????????????????????+ Bytes.toString(cell.getValue()) +?"\t"????????????????????????+ cell.getTimestamp();????????????????context.write(new?Text(rowkey_result), NullWritable.get());????????????}????????}????}?????????static?class?HBaseToHDFSMRReducer?extends?Reducer{????????@Override????????protected?void?reduce(Text key, Iterable arg1, Reducer.Context context)?throws?IOException, InterruptedException {????????????context.write(key, NullWritable.get());????????}????}}

  2、HDFSToHbase

? ? ? ? 从?hdfs?从读入数据,处理之后写入?hbase,代码实现:?

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101package?com.ghgj.hbase.hbase2hdfsmr;?import?java.io.IOException;?import?org.apache.hadoop.conf.Configuration;import?org.apache.hadoop.fs.Path;import?org.apache.hadoop.hbase.HBaseConfiguration;import?org.apache.hadoop.hbase.HColumnDescriptor;import?org.apache.hadoop.hbase.HTableDescriptor;import?org.apache.hadoop.hbase.TableName;import?org.apache.hadoop.hbase.client.HBaseAdmin;import?org.apache.hadoop.hbase.client.Mutation;import?org.apache.hadoop.hbase.client.Put;import?org.apache.hadoop.hbase.io.ImmutableBytesWritable;import?org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import?org.apache.hadoop.hbase.mapreduce.TableReducer;import?org.apache.hadoop.hbase.util.Bytes;import?org.apache.hadoop.io.LongWritable;import?org.apache.hadoop.io.NullWritable;import?org.apache.hadoop.io.Text;import?org.apache.hadoop.mapreduce.Job;import?org.apache.hadoop.mapreduce.Mapper;import?org.apache.hadoop.mapreduce.Reducer;import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;?public?class?HDFSToHBaseMR {????private?static?final?String ZK_CONNECT =?"hadoop03:2181,hadoop04:2181,hadoop05:2181";????private?static?final?String TABLE_NAME =?"person_info";?????public?static?void?main(String[] args)?throws?Exception {?????????Configuration conf = HBaseConfiguration.create();????????conf.set("hbase.zookeeper.quorum", ZK_CONNECT);????????System.setProperty("HADOOP_USER_NAME",?"hadoop");????????Job job = Job.getInstance(conf);????????job.setJarByClass(HDFSToHBaseMR.class);?????????// 以下这一段代码是为了创建一张hbase表叫做 person_info????????HBaseAdmin admin =?new?HBaseAdmin(conf);????????HTableDescriptor htd =?new?HTableDescriptor(TableName.valueOf(TABLE_NAME));????????htd.addFamily(new?HColumnDescriptor("base_info"));????????if?(admin.tableExists(TABLE_NAME)) {????????????admin.disableTable(TABLE_NAME);????????????admin.deleteTable(TABLE_NAME);????????}????????admin.createTable(htd);?????????// 给job指定mapperclass 和? reducerclass????????job.setMapperClass(HDFSToHBaseMRMapper.class);????????TableMapReduceUtil.initTableReducerJob(TABLE_NAME, HDFSToHBaseMRReducer.class, job);?????????????????// 给mapper和reducer指定输出的key-value的类型????????job.setMapOutputKeyClass(Text.class);????????job.setMapOutputValueClass(NullWritable.class);????????job.setOutputKeyClass(ImmutableBytesWritable.class);????????job.setOutputValueClass(Mutation.class);?????????// 指定输入数据的路径????????FileInputFormat.setInputPaths(job,?new?Path("/hbase2hdfs/output"));?????????????????// job提交????????boolean?boo = job.waitForCompletion(true);????????System.exit(boo ??0?:1);????}?????static?class?HDFSToHBaseMRMapper?extends?Mapper {????????@Override????????protected?void?map(LongWritable key, Text value, Mapper.Context context)?throws?IOException, InterruptedException {????????????context.write(value, NullWritable.get());????????}????}?????/**?????* TableReducer extends Reducer 这么做的唯一效果就是把valueout的类型确定为Mutation?????*/????static?class?HDFSToHBaseMRReducer?extends?TableReducer {?????????/**?????????* baiyc_20150716_0001 base_info name baiyc1 1488348387443?????????*/????????@Override????????protected?void?reduce(Text key, Iterable values, Reducer.Context context)?throws?IOException, InterruptedException {?????????????String[] splits = key.toString().split("\t");????????????String rowkeyStr = splits[0];????????????ImmutableBytesWritable rowkey =?new?ImmutableBytesWritable(Bytes.toBytes(rowkeyStr));?????????????Put put =?new?Put(Bytes.toBytes(rowkeyStr));?????????????String family = splits[1];????????????String qualifier = splits[2];????????????String value = splits[3];????????????String ts = splits[4];?????????????put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Long.parseLong(ts), Bytes.toBytes(value));?????????????context.write(rowkey, put);????????}????}?}

二、Hbase和mysql永利ag手机版|首页数据进行互导

? ? ? 1、mysql数据导入到hbase(用sqoop)

? 命令:

sqoop import --connect jdbc:mysql://hadoop01/mytest --username root --password root
--table student --hbase-create-table --hbase-table studenttest --column-family name
--hbase-row-key id

?

其 中 会 报 错 , 说 Exception in thread "main" java.lang.NoSuchMethodError:?org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V 是由于版本不兼容引起,我们可以通过事先创建好表就可以使用了。
请使用下面的命令:?

sqoop import --connect jdbc:mysql://hadoop01/mytest --username root --password root
--table student --hbase-table studenttest1 --column-family name --hbase-row-key id

?

--hbase-create-table 自动在 hbase 中创建表
--column-family name 指定列簇名字
--hbase-row-key id 指定 rowkey 对应的 mysql 当中的键

? ? 2、hbase数据导入到mysql

目前没有直接的命令将 Hbase 中的数据导出到 mysql,但是可以先将 hbase 中的数据导?出到 hdfs 中,再将数据导出 mysql

替代方案:
先将 hbase 的数据导入到 hdfs 或者 hive,然后再将数据导入到 mysql

三、hbase整合hive

? ? ?原理:

Hive 与 HBase 利用两者本身对外的 API 来实现整合,主要是靠 HBaseStorageHandler 进?行通信,利用 HBaseStorageHandler, Hive 可以获取到 Hive 表对应的 HBase 表名,列簇以及?列, InputFormat 和 OutputFormat 类,创建和删除 HBase 表等。

Hive 访问 HBase 中表数据,实质上是通过 MapReduce 读取 HBase 表数据,其实现是在 MR?中,使用 HiveHBaseTableInputFormat 完成对 HBase 表的切分,获取 RecordReader 对象来读?取数据。

对 HBase 表的切分原则是一个 Region 切分成一个 Split,即表中有多少个 Regions,MR 中就有多?少个 Map。

读取 HBase 表数据都是通过构建 Scanner,对表进行全表扫描,如果有过滤条件,则转化为?Filter。当过滤条件为 rowkey 时,则转化为对 rowkey 的过滤, Scanner 通过 RPC 调用 ?RegionServer 的 next()来获取数据;

?1、准备hbase表 数据

create 'mingxing',{NAME => 'base_info',VERSIONS => 1},{NAME => 'extra_info',VERSIONS => 1}

插入数据:

put 'mingxing','rk001','base_info:name','huangbo'
put 'mingxing','rk001','base_info:age','33'
put 'mingxing','rk001','extra_info:math','44'
put 'mingxing','rk001','extra_info:province','beijing'
put 'mingxing','rk002','base_info:name','xuzheng'
put 'mingxing','rk002','base_info:age','44'
put 'mingxing','rk003','base_info:name','wangbaoqiang'
put 'mingxing','rk003','base_info:age','55'
put 'mingxing','rk003','base_info:gender','male'
put 'mingxing','rk004','extra_info:math','33'
put 'mingxing','rk004','extra_info:province','tianjin'
put 'mingxing','rk004','extra_info:children','3'
put 'mingxing','rk005','base_info:name','liutao'
put 'mingxing','rk006','extra_info:name','liujialing'

? ?2、hive端操作

?

三、hbasetohbase ? byMR

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120package com.ghgj.hbase.hbase2hdfsmr;?import java.io.IOException;import java.util.List;?import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.HBaseAdmin;import org.apache.hadoop.hbase.client.Mutation;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;?public?class?HBaseToHBaseByMR {?????private?static?final String ZK_CONNECT =?"hadoop03:2181,hadoop04:2181,hadoop05:2181";????private?static?final String OLD_TABLE_NAME =?"user_info";????private?static?final String NEW_TABLE_NAME =?"person_info2";????private?static?final String FAMILY =?"base_info";????private?static?final String QUALIFIER =?"age";?????public?static?void?main(String[] args) throws Exception {?????????Configuration conf = HBaseConfiguration.create();????????conf.set("hbase.zookeeper.quorum", ZK_CONNECT);????????System.setProperty("HADOOP_USER_NAME",?"hadoop");????????// conf.set("fs.defaultFS", "hdfs://myha01/");?????????Job job = Job.getInstance(conf);????????job.setJarByClass(HBaseToHDFSMR.class);?????????// 以下这一段代码是为了创建一张hbase表叫做 person_info????????HBaseAdmin admin =?new?HBaseAdmin(conf);????????HTableDescriptor htd =?new?HTableDescriptor(TableName.valueOf(NEW_TABLE_NAME));????????htd.addFamily(new?HColumnDescriptor(FAMILY));????????if?(admin.tableExists(NEW_TABLE_NAME)) {????????????admin.disableTable(NEW_TABLE_NAME);????????????admin.deleteTable(NEW_TABLE_NAME);????????}????????admin.createTable(htd);?????????Scan scan =?new?Scan();????????scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(QUALIFIER));????????/**?????????* TableMapReduceUtil:以util结尾:工具?????????* MapReduceFactory:以factory结尾,它是工厂类,最大作用就是管理对象的生成?????????*/????????TableMapReduceUtil.initTableMapperJob(OLD_TABLE_NAME, scan, HBaseToHBaseByMRMapper.class, Text.class, NullWritable.class, job);????????TableMapReduceUtil.initTableReducerJob(NEW_TABLE_NAME, HBaseToHBaseByMRReducer.class, job);?????????// 给mapper和reducer指定输出的key-value的类型????????job.setMapOutputKeyClass(Text.class);????????job.setMapOutputValueClass(NullWritable.class);????????job.setOutputKeyClass(ImmutableBytesWritable.class);????????job.setOutputValueClass(Mutation.class);?????????boolean waitForCompletion = job.waitForCompletion(true);????????System.exit(waitForCompletion ? 0 : 1);????}?????static?class?HBaseToHBaseByMRMapper extends TableMapper {????????/**?????????* key:rowkey value:map方法每执行一次接收到的一个参数,这个参数就是一个Result实例?????????* 这个Result里面存的东西就是rowkey, family, qualifier, value, timestamp?????????*/????????@Override????????protected?void?map(ImmutableBytesWritable key, Result value, Mapper.Context context) throws IOException, InterruptedException {????????????String rowkey = Bytes.toString(key.copyBytes());????????????System.out.println(rowkey);????????????List cells = value.listCells();????????????for?(int?i = 0; i < cells.size(); i++) {????????????????Cell cell = cells.get(i);????????????????String rowkey_result = Bytes.toString(cell.getRow()) +?"\t"?+ Bytes.toString(cell.getFamily()) +?"\t"?+ Bytes.toString(cell.getQualifier()) +?"\t"?+ Bytes.toString(cell.getValue()) +?"\t"?+ cell.getTimestamp();????????????????context.write(new?Text(rowkey_result), NullWritable.get());????????????}????????}????}?????/**?????* TableReducer extends Reducer 这么做的唯一效果就是把valueout的类型确定为Mutation?????*/????static?class?HBaseToHBaseByMRReducer extends TableReducer {?????????/**?????????* baiyc_20150716_0001 base_info name baiyc1 1488348387443?????????*/????????@Override????????protected?void?reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {?????????????String[] splits = key.toString().split("\t");????????????String rowkeyStr = splits[0];????????????ImmutableBytesWritable rowkey =?new?ImmutableBytesWritable(Bytes.toBytes(rowkeyStr));?????????????Put put =?new?Put(Bytes.toBytes(rowkeyStr));?????????????String family = splits[1];????????????String qualifier = splits[2];????????????String value = splits[3];????????????String ts = splits[4];?????????????put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Long.parseLong(ts), Bytes.toBytes(value));?????????????context.write(rowkey, put);????????}????}}
www.htsjk.Com true http://www.htsjk.com/hbase/37752.html NewsArticle Hbase(七)hbase高级编程, ?? 一、Hbase结合mapreduce? ?? ? ? ?为什么需要用 mapreduce 去访问 hbase 的数据? ? ? ?——加快分析速度和扩展分析能力 ? ? ?Mapreduce 访问 hbase 数据作分析...
相关文章
    暂无相关文章
评论暂时关闭