HBase的简单学习四

一 HBase的进阶

1.1 hbase的写流程

Hbase读取数据的流程:
1)是由客户端发起读取数据的请求,首先会与zookeeper建立连接
2)从zookeeper中获取一个hbase:meta表位置信息,被哪一个regionserver所管理着
     hbase:meta表:hbase的元数据表,在这个表中存储了自定义表相关的元数据,包括表名,表有哪些列簇,表有哪些region,每个region存储的位置,每个region被哪个regionserver所管理,这个表也是存储在某一个region上的,并且这个meta表只会被一个regionserver所管理。这个表的位置信息只有zookeeper知道。
3)连接这个meta表对应的regionserver,从meta表中获取当前你要读取的这个表对应的regionsever是谁。
     当一个表多个region怎么办呢?
     如果我们获取数据是以get的方式,只会返回一个regionserver
     如果我们获取数据是以scan的方式,会将所有的region对应的regionserver的地址全部返回。
4)连接要读取表的对应的regionserver,从regionserver上的开始读取数据:
       读取顺序:memstore-->blockcache-->storefile-->Hfile中
       注意:如果是scan操作,就不仅仅去blockcache了,而是所有都会去找。

1.2 hbase的写流程

Hbase的写入数据流程:
1)由客户端发起写数据请求,首先会与zookeeper建立连接
2)从zookeeper中获取hbase:meta表被哪一个regionserver所管理
3)连接hbase:meta表中获取对应的regionserver地址 (从meta表中获取当前要写入数据的表对应的region所管理的regionserver) 只会返回一个regionserver地址
4)与要写入数据的regionserver建立连接,然后开始写入数据,将数据首先会写入到HLog,然后将数据写入到对应store模块中的memstore中
(可能会写多个),当这两个地方都写入完成之后,表示数据写入完成。


-------------------------后面的步骤是服务器内部的操作-----------------
异步操作
5)随着客户端不断地写入数据,memstore中的数据会越来多,当内存中的数据达到阈值(128M/1h)的时候,放入到blockchache中,生成新的memstore接收用户过来的数据,然后当blockcache的大小达到一定阈值(0.85)的时候,开始触发flush机制,将数据最终刷新到HDFS中形成小的Hfile文件。

6)随着不断地刷新,storefile不断地在HDFS上生成小HFIle文件,当小的HFile文件达到阈值的时候(3个及3个以上),就会触发Compaction机制,将小的HFile合并成一个大的HFile.

7)随着不断地合并,大的HFile文件会越来越大,当达到一定阈值(2.0版本之后最终10G)的时候,会触发分裂机制(split),将大的HFile文件进行一分为二,同时管理这个大的HFile的region也会被一分为二,形成两个新的region和两个新的HFile文件,一对一的进行管理,将原来旧的region和分裂之前大的HFile文件慢慢地就会下线处理。

1.3 Region的分裂策略

region中存储的是一张表的数据,当region中的数据条数过多的时候,会直接影响查询效率。当region过大的时候,region会被拆分为两个region,HMaster会将分裂的region分配到不同的regionserver上,这样可以让请求分散到不同的RegionServer上,已达到负载均衡 , 这也是HBase的一个优点 。

  • ConstantSizeRegionSplitPolicy

    0.94版本前,HBase region的默认切分策略

    当region中最大的store大小超过某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。

    但是在生产线上这种切分策略却有相当大的弊端(切分策略对于大表和小表没有明显的区分):

    • 阈值(hbase.hregion.max.filesize)设置较大对大表比较友好,但是小表就有可能不会触发分裂,极端情况下可能就1个,形成热点,这对业务来说并不是什么好事。

    • 如果设置较小则对小表友好,但一个大表就会在整个集群产生大量的region,这对于集群的管理、资源使用、failover来说都不是一件好事。

  • IncreasingToUpperBoundRegionSplitPolicy

    0.94版本~2.0版本默认切分策略

    总体看和ConstantSizeRegionSplitPolicy思路相同,一个region中最大的store大小大于设置阈值就会触发切分。 但是这个阈值并不像ConstantSizeRegionSplitPolicy是一个固定的值,而是会在一定条件下不断调整,调整规则和region所属表在当前regionserver上的region个数有关系.

    region split阈值的计算公式是:

    • 设regioncount:是region所属表在当前regionserver上的region的个数

    • 阈值 = regioncount^3 * 128M * 2,当然阈值并不会无限增长,最大不超过MaxRegionFileSize(10G),当region中最大的store的大小达到该阈值的时候进行region split

    例如:

    • 第一次split阈值 = 1^3 * 256 = 256MB

    • 第二次split阈值 = 2^3 * 256 = 2048MB

    • 第三次split阈值 = 3^3 * 256 = 6912MB

    • 第四次split阈值 = 4^3 * 256 = 16384MB > 10GB,因此取较小的值10GB

    • 后面每次split的size都是10GB了

    特点

    • 相比ConstantSizeRegionSplitPolicy,可以自适应大表、小表;

    • 在集群规模比较大的情况下,对大表的表现比较优秀

    • 对小表不友好,小表可能产生大量的小region,分散在各regionserver上

    • 小表达不到多次切分条件,导致每个split都很小,所以分散在各个regionServer上

  • SteppingSplitPolicy

    2.0版本默认切分策略

    相比 IncreasingToUpperBoundRegionSplitPolicy 简单了一些 ​ region切分的阈值依然和待分裂region所属表在当前regionserver上的region个数有关系

    • 如果region个数等于1,切分阈值为flush size 128M

    • 否则为MaxRegionFileSize。

    这种切分策略对于大集群中的大表、小表会比 IncreasingToUpperBoundRegionSplitPolicy 更加友好,小表不会再产生大量的小region,而是适可而止。

  • KeyPrefixRegionSplitPolicy

    根据rowKey的前缀对数据进行分区,这里是指定rowKey的前多少位作为前缀,比如rowKey都是16位的,指定前5位是前缀,那么前5位相同的rowKey在相同的region中。

  • DelimitedKeyPrefixRegionSplitPolicy

    保证相同前缀的数据在同一个region中,例如rowKey的格式为:userid_eventtype_eventid,指定的delimiter为 _ ,则split的的时候会确保userid相同的数据在同一个region中。 按照分隔符进行切分,而KeyPrefixRegionSplitPolicy是按照指定位数切分。

  • BusyRegionSplitPolicy

    按照一定的策略判断Region是不是Busy状态,如果是即进行切分

    如果你的系统常常会出现热点Region,而你对性能有很高的追求,那么这种策略可能会比较适合你。它会通过拆分热点Region来缓解热点Region的压力,但是根据热点来拆分Region也会带来很多不确定性因素,因为你也不知道下一个被拆分的Region是哪个。

  • DisabledRegionSplitPolicy

    不启用自动拆分, 需要指定手动拆分

1.4  Compaction操作

 Minor Compaction:

  • 指选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,在这个过程中不会处理已经Deleted或Expired的Cell。一次 Minor Compaction 的结果是更少并且更大的StoreFile。

Major Compaction:

  • 指将所有的StoreFile合并成一个StoreFile,这个过程会清理三类没有意义的数据:被删除的数据TTL过期数据版本号超过设定版本号的数据。另外,一般情况下,major compaction时间会持续比较长,整个过程会消耗大量系统资源,对上层业务有比较大的影响。因此线上业务都会将关闭自动触发major compaction功能,改为手动在业务低峰期触发。

1.5 经典面试题

HBase适合存储PB级别的海量数据(百亿千亿量级条记录),如果根据记录主键Rowkey来查询,能在几十到百毫秒内返回数据。

那么HBase是如何做到的呢?

接下来,简单阐述一下数据的查询思路和过程。

查询过程

第1步:

项目有100亿业务数据,存储在一个HBase集群上(由多个服务器数据节点构成),每个数据节点上有若干个Region(区域),每个Region实际上就是HBase中一批数据的集合(一段连续范围rowkey的数据)。

我们现在开始根据主键RowKey来查询对应的记录,通过meta表可以帮我们迅速定位到该记录所在的数据节点,以及数据节点中的Region,目前我们有100亿条记录,占空间10TB。所有记录被切分成5000个Region,那么现在,每个Region就是2G。

由于记录在1个Region中,所以现在我们只要查询这2G的记录文件,就能找到对应记录。

第2步:

由于HBase存储数据是按照列族存储的。比如一条记录有400个字段,前100个字段是人员信息相关,这是一个列簇(列的集合);中间100个字段是公司信息相关,是一个列簇。另外100个字段是人员交易信息相关,也是一个列簇;最后还有100个字段是其他信息,也是一个列簇

这四个列簇是分开存储的,这时,假设2G的Region文件中,分为4个列族,那么每个列族就是500M。

到这里,我们只需要遍历这500M的列簇就可以找到对应的记录。

第3步:

如果要查询的记录在其中1个列族上,1个列族在HDFS中会包含1个或者多个HFile。

如果一个HFile一般的大小为100M,那么该列族包含5个HFile在磁盘上或内存中。

由于HBase的内存进入磁盘中的数据是排好序(字典顺序)的,要查询的记录有可能在最前面,也有可能在最后面,按平均来算,我们只需遍历2.5个HFile共250M,即可找到对应的记录。

第4步:

每个HFile中,是以键值对(key/value)方式存储,只要遍历文件中的key位置并判断符合条件即可

一般key是有限的长度,假设key/value比是1:24,最终只需要10M的数据量,就可获取的对应的记录。

如果数据在机械磁盘上,按其访问速度100M/S,只需0.1秒即可查到。

如果是SSD的话,0.01秒即可查到。

当然,扫描HFile时还可以通过布隆过滤器快速定位到对应的HFile,以及HBase是有内存缓存机制的,如果数据在内存中,效率会更高。

 二 HBase与Hive的集成

2.1 相关概念

HBase与Hive的对比

hive:

数据仓库:Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用HQL去管理查询。

用于数据分析、清洗:Hive适用于离线的数据分析和清洗,延迟较高。

基于HDFS、MapReduce:Hive存储的数据依旧在DataNode上,编写的HQL语句终将是转换为MapReduce代码执行。

HBase

数据库:是一种面向列族存储的非关系型数据库。

用于存储结构化和非结构化的数据:适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作。

基于HDFS:数据持久化存储的体现形式是HFile,存放于DataNode中,被ResionServer以region的形式进行管理。

延迟较低,接入在线业务使用:面对大量的企业数据,HBase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。

2.1  配置相关文件

1.配置hive-site.xml 文件

	<property>
        <name>hive.zookeeper.quorum</name>
        <value>master,node1,node2</value>
    </property>

    <property>
        <name>hive.zookeeper.client.port</name>
        <value>2181</value>
    </property>

2.3 关联

1.

HBase中已经存储了某一张表,在Hive中创建一个外部表来关联HBase中的这张表

建立外部表的字段名要和hbase中的列名一致

前提是hbase中已经有表了

 示例,以后模仿这个

create external table students_hbase
(
id string,
name string,
age string,
gender string, 
clazz string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = "
:key,
info:name,
info:age,
info:gender,
info:clazz
")
tblproperties("hbase.table.name" = "default:students");

三 Phoenix

3.1 安装

1.准备好文件,上传到Linux中

2.解压 tar -zxvf phoenix-hbase-2.2-5.1.3-bin.tar.gz -C ../

3.2 配置文件

1.配置环境变量

/etc/profile

2.将phoenix-server-hbase-2.2-5.1.3.jar复制到所有节点的hbase lib目录下

3.3 启动

1.先要启动hbase

2.sqlline.py master,node1,node2

3.4 常用语法

1.建表语句

必须要设置主键

CREATE TABLE IF NOT EXISTS student_p1 (
 id VARCHAR NOT NULL PRIMARY KEY, 
 name VARCHAR,
 age BIGINT, 
 gender VARCHAR ,
 clazz VARCHAR
);

2.插入语句

语法 upsert into 表名 values(内容)

upsert into STUDENT values('1500100004','葛德曜',24,'男','理科三班');
upsert into STUDENT values('1500100005','宣谷芹',24,'男','理科六班');
upsert into STUDENT values('1500100006','羿彦昌',24,'女','理科三班');

3.查询语句

跟sql语句差不多

4.删除数据

语法 delete from 表名 where 主键名=主键值;

delete from student_p1 where id='1500100004';

5. 删除表

drop table student_p1;

6.退出

!quit

3.5 注意事项

1.在phoenix中创建表之后,我们可以使用 !table或者show table 查看所有的表,并以大写的形式展现给我们,我们使用sql语句查询的时候不分表的大小写。

2.在phoenix中创建表之后,可以在hbase中可以通过scan加大写的表名查看phoenix中的表,但是在hbase中创建的表,在phoenix就无法查看。如果想查看,则需要在phoenix中进行表的映射。映射方式有两种:视图映射和表映射

3、如何在phoneix中使用hbase原本的数据表呢?
        视图映射:视图并不是真正意义上的表,而是在phoneix创建一个映射关系,以表的形式将hbase中原本数据映射过来,可以在基础之上编写sql语句进行分析,需要注意的是,我们在视图上sql分析的时候,表名和列名需要加双引号。删除视图不会影响原本hbase中的数据,视图无法做修改,只能查询,视图在phoneix中被看作成一个只读表。
        表映射:建表的语法来说与视图映射相差一个单词,其他的没啥区别。使用上,表映射可以直接在phoneix中对表数据进行增删改查。将phoneix中表映射删了,原来hbase中的表也对应删除。
        
    4、映射查询的时候,主键可以不用加双引号,非主键的列必须加双引号

3.7映射

3.7.1 视图映射

1.Phoenix创建的视图是只读的,所以只能用来做查询,无法通过视图对源数据进行修改等操作

2.在phoenix创建视图, primary key 对应到hbase中的rowkey

3.创建的映射要与想要映射的hbase中的表名一致,且视图要被双引号包起来。

4.视图映射相关建表案例

创建的是映射,所以是view

CREATE view "students" (
 id VARCHAR NOT NULL PRIMARY KEY, 
 "info"."name" VARCHAR,
 "info"."age" VARCHAR, 
 "info"."gender" VARCHAR,
 "info"."clazz" VARCHAR
) column_encoded_bytes=0;

5.查询相关事项

a.映射创建好之后,想要查看映射内容,必须使用sql语句的时候将视图名用双引号包起来,才能查看

b.映射查询的时候,主键可以不用加双引号,非主键的列必须加双引号

c.删除视图不会影响原本hbase中的数据,视图无法做修改,只能查询,视图在phoneix中被看作成一个只读表。

drop view "视图名";

3.7.2 表映射

1.建表映射

1) 当HBase中已经存在表时,可以以类似创建视图的方式创建关联表,只需要将create view改为create table即可。

2)当HBase中不存在表时,可以直接使用create table指令创建需要的表,并且在创建指令中可以根据需要对HBase表结构进行显示的说明。

3)创建的映射要与想要映射的hbase中的表名一致,且表要被双引号包起来。

CREATE table "students" (
 id VARCHAR NOT NULL PRIMARY KEY, 
 "info"."name" VARCHAR,
 "info"."age" VARCHAR, 
 "info"."gender" VARCHAR,
 "info"."clazz" VARCHAR
) column_encoded_bytes=0;

2.使用create table创建的关联表,如果对表进行了修改,源数据也会改变,同时如果关联表被删除,源表也会被删除。但是视图就不会,如果删除视图,源数据不会发生改变。

3.8 sql案例

1.过滤出文科一班的学生总分前10的学生信息

过滤出文科一班的学生
select ID as id,"name" as name,"clazz" as clazz from "students" where "clazz"='文科一班';

将成绩表做切分转换
select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores";

在第二步的基础之上求每个学生的总分
select t1.student_id as student_id,sum(to_number(t1.score)) as sum_score from (select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores") t1 group by t1.student_id;

与步骤1的文科学生进行关联
select b1.id as student_id,b1.name as name,b1.clazz as clazz,to_char(b2.sum_score) as sum_score from (select ID as id,"name" as name,"clazz" as clazz from "students" where "clazz"='文科一班') b1 join (select t1.student_id as student_id,sum(to_number(t1.score)) as sum_score from (select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores") t1 group by t1.student_id) b2 on (b1.id=b2.student_id) order by b2.sum_score desc limit 10;

2.注意事项

通过这个例子遇到的注意点:
1、切分字符串的函数不是split,而是regexp_split
2、Phoenix中,数组的索引是从1开始的
3、给字段起别名之后的嵌套查询,就不需要再加双引号了,主键本身就可以不用加
4、sum函数中的数据类型必须是数值类型,如果是10的整数倍,会以科学计数法进行标识 580->5.8E+2(5.8*10^2)
5、to_number()转数值  to_char()转字符串

四  bulkLoad实现批量导入

4.1相关概念

1.优点:

  1. 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。

  2. 它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

2.限制:

  1. 仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。

  2. HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群

4.2 编写代码

代码编写:

提前在Hbase中创建好表

生成Hfile基本流程:

  1. 设置Mapper的输出KV类型:

    K: ImmutableBytesWritable(代表行键)

    V: KeyValue (代表cell)

2. 开发Mapper

读取你的原始数据,按你的需求做处理

输出rowkey作为K,输出一些KeyValue(Put)作为V

3. 配置job参数

a. Zookeeper的连接地址

b. 配置输出的OutputFormat为HFileOutputFormat2,并为其设置参数

4. 提交job

导入HFile到RegionServer的流程

构建一个表描述对象

构建一个region定位工具

然后用LoadIncrementalHFiles来doBulkload操作

package com.shujia.jinjie;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.FileInputStream;
import java.io.IOException;

class MyBulkLoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{
       /*
    ==> tl_hefei_shushan_503.txt <==
    D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C	117210031795040	83401	8340104	301	20180503190539	20180503233517	20180503
    8827F3196977C6F752680505FEC0C7D3A18D4DFC	\N	\N	\N	\N	\N	\N	\N
     */

    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        //处理脏数据
        if (!(line.contains("shushan")) && !(line.contains("\\N"))){
            String[] strings = line.split("\t");
            String phoneNum = strings[0];
            String wg = strings[1];
            String city = strings[2];
            String qx = strings[3];
            String stayTime = strings[4];
            String startTime = strings[5];
            String endTime = strings[6];
            String date = strings[7];

            //因为手机号代表一个人,一个人可能会出现在多个区域,不能直接将手机号作为rk,为了数据的完整性
            //可以将手机号和进入网格的时间拼接
            byte[] rk = Bytes.toBytes(phoneNum + "-" + startTime);
            byte[] family = Bytes.toBytes("info");

            //创建输出key对象
//            ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
//            ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
            ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
            //将其余的6列封装成单元格
            KeyValue keyValue1 = new KeyValue(rk, family, Bytes.toBytes("wg"), Bytes.toBytes(wg));
            context.write(immutableBytesWritable,keyValue1);

            KeyValue keyValue2 = new KeyValue(rk, family, Bytes.toBytes("city"), Bytes.toBytes(city));
            context.write(immutableBytesWritable,keyValue2);

            KeyValue keyValue3 = new KeyValue(rk, family, Bytes.toBytes("qx"), Bytes.toBytes(qx));
            context.write(immutableBytesWritable,keyValue3);

            KeyValue keyValue4 = new KeyValue(rk, family, Bytes.toBytes("stayTime"), Bytes.toBytes(stayTime));
            context.write(immutableBytesWritable,keyValue4);

            KeyValue keyValue5 = new KeyValue(rk, family, Bytes.toBytes("endTime"), Bytes.toBytes(endTime));
            context.write(immutableBytesWritable,keyValue5);

            KeyValue keyValue6 = new KeyValue(rk, family, Bytes.toBytes("date"), Bytes.toBytes(date));
            context.write(immutableBytesWritable,keyValue6);

        }

    }
}

public class BulkLoadingDemo {
    public static void main(String[] args) throws Exception{
        //获取集群配置文件,可以是hdfs集群也可以是hbase集群
        Configuration conf = HBaseConfiguration.create();
        //设置zookeeper集群信息
        conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");

        //创建Job作业
        Job job = Job.getInstance(conf);

        //给job作业起一个名字
        job.setJobName("hbase使用bulkloading方式批量加载数据作业");

        //设置主类
        job.setJarByClass(BulkLoadingDemo.class);
        //设置Map类
        job.setMapperClass(MyBulkLoadMapper.class);

        //无需设置自定义reduce类,使用hbase中的reduce类
        job.setOutputFormatClass(HFileOutputFormat2.class);

        //设置map的输出key和value类型
        //key -- 行键
        //value -- 单元格(列)
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);

        //设置region中字典排序的过程的类
        job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
        job.setReducerClass(KeyValueSortReducer.class);

        //设置文件的输入输出路径
        FileInputFormat.setInputPaths(job,new Path("/bigdata29/data/dianxin_data"));
        //设置hfile文件
        FileOutputFormat.setOutputPath(job,new Path("/bigdata29/hbase/out2"));

        //获取数据库连接对象
        Connection conn = ConnectionFactory.createConnection(conf);
        //获取数据库操作对象
        Admin admin = conn.getAdmin();
        TableName tn = TableName.valueOf("dianxin_data_bulk");
        Table table = conn.getTable(tn);
        RegionLocator regionLocator = conn.getRegionLocator(tn);

        //使用HFileOutputFormat2类创建Hfile文件
        HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator);

        //提交作业并执行
        boolean b = job.waitForCompletion(true);
        if(b){
            System.out.println("====================== Hfile文件生成成功!! 在/bigdata29/hbase/out2目录下 ================================");
        }else {
            System.out.println("============= Hfile文件生成失败!! ==================");
        }



    }






}

 这个命令在Linux运行

hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles /bigdata29/hbase/out2 dianxin_data_bulk

五 HBase的rowKey设计

5.1.相关概念

HBase是三维有序存储的,通过rowkey(行键),column key(column family和qualifier)和TimeStamp(时间戳)这个三个维度可以对HBase中的数据进行快速定位。

HBase中rowkey可以唯一标识一行记录,在HBase查询的时候,有两种方式:

通过get方式,指定rowkey获取唯一一条记录

通过scan方式,设置startRow和stopRow参数进行范围匹配

全表扫描,即直接扫描整张表中所有行记录

5.2.相关原则

1.长度不宜过长

rowkey是一个二进制码流,可以是任意字符串,最大长度 64kb ,实际应用中一般为10-100bytes,以 byte[] 形式保存,一般设计成定长。

建议越短越好,不要超过16个字节,原因如下:

数据的持久化文件HFile中是按照KeyValue存储的,如果rowkey过长,比如超过100字节,1000w行数据,光rowkey就要占用100*1000w=10亿个字节,将近1G数据,这样会极大影响HFile的存储效率;

MemStore将缓存部分数据到内存,如果rowkey字段过长,内存的有效利用率就会降低,系统不能缓存更多的数据,这样会降低检索效率。

目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。

2.散列性

如果rowkey按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。

3.唯一性

必须在设计上保证其唯一性,rowkey是按照字典顺序排序存储的,因此,设计rowkey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。

5.3 设计原则

1.什么是热点

HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以将相关的行以及会被一起读取的行存取在临近位置,便于scan。然而糟糕的rowkey设计是热点的源头。 热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求。 设计良好的数据访问模式以使集群被充分,均衡的利用。

为了避免写热点,设计rowkey使得不同行在同一个region,但是在更多数据情况下,数据应该被写入集群的多个region,而不是一个。

下面是一些常见的避免热点的方法以及它们的优缺点:

加盐

这里所说的加盐不是密码学中的加盐,而是在rowkey的前面增加随机数,具体就是给rowkey分配一个随机前缀以使得它和之前的rowkey的开头不同。分配的前缀种类数量应该和你想使用数据分散到不同的region的数量一致。加盐之后的rowkey就会根据随机生成的前缀分散到各个region上,以避免热点。

哈希

哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据

反转

第三种防止热点的方法时反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。

反转rowkey的例子以手机号为rowkey,可以将手机号反转后的字符串作为rowkey,这样的就避免了以手机号那样比较固定开头导致热点问题

时间戳反转

一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为rowkey的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如 [key]reverse_timestamp , [key] 的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中rowkey是有序的,第一条记录是最后录入的数据。

示例

# 原数据:以时间戳_user_id作为rowkey
# 时间戳高位变化不大,太连续,最终可能会导致热点问题
1638584124_user_id
1638584135_user_id
1638584146_user_id
1638584157_user_id
1638584168_user_id
1638584179_user_id
...

# 解决方案:加盐、反转、哈希

# 加盐
# 加上随即前缀,随机的打散
# 该过程无法预测 前缀时随机的
00_1638584124_user_id
05_1638584135_user_id
03_1638584146_user_id
04_1638584157_user_id
02_1638584168_user_id
06_1638584179_user_id

# 反转
# 适用于高位变化不大,低位变化大的rowkey
4214858361_user_id
5314858361_user_id
6414858361_user_id
7514858361_user_id
8614858361_user_id
9714858361_user_id

# 散列 md5、sha1、sha256......
25531D7065AE158AAB6FA53379523979_user_id
60F9A0072C0BD06C92D768DACF2DFDC3_user_id
D2EFD883A6C0198DA3AF4FD8F82DEB57_user_id
A9A4C265D61E0801D163927DE1299C79_user_id
3F41251355E092D7D8A50130441B58A5_user_id
5E6043C773DA4CF991B389D200B77379_user_id

# 时间戳"反转"
# rowkey:时间戳_user_id
# rowkey是字典升序的,那么越新的记录会被排在最后面,不容易被获取到
# 需求:让最新的记录排在最前面

# 大数:9999999999
# 大数-小数

1638584124_user_id => 8361415875_user_id
1638584135_user_id => 8361415864_user_id
1638584146_user_id => 8361415853_user_id
1638584157_user_id => 8361415842_user_id
1638584168_user_id => 8361415831_user_id
1638584179_user_id => 8361415820_user_id

1638586193_user_id => 8361413806_user_id

5.4实战案例

电信案例

要求

手机号,网格编号,城市编号,区县编号,停留时间,进入时间,离开时间,时间分区
D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C,117210031795040,83401,8340104,301,20180503190539,20180503233517,20180503


将用户位置数据保存到hbase
    查询需求
        1、通过手机号查询用户最近10条位置记录

        2、获取用户某一天在一个城市中的所有位置

    怎么设计hbase表
        1、rowkey
        2、时间戳

代码

package com.shujia.jinjie;


import com.shujia.utils.HBaseUtils;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Scanner;

/**
 * 查询需求
 *         1、通过手机号查询用户最近10条位置记录
 *
 *         2、获取用户某一天在一个城市中的所有位置
 */
public class DianXinRowKeyDemo {

    //创建数据库连接对象
    //创建数据库操作对象
    private static Connection conn = HBaseUtils.CONNECTION;
    private static Admin admin = HBaseUtils.ADMIN;
    public static void main(String[] args) throws Exception{

        //建表
//        HBaseUtils.createOneTable("dianxin_tb1","info");

        //读取文件数据,添加到hbase中的dianxin_tb1表中
//        putsData();

        //通过手机号查询用户最近10条位置记录
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入要查询的手机号:");
        String phoneNum = sc.next();
//        findUser(phoneNum);

        //2、获取用户某一天在一个城市中的所有位置
        System.out.println("请输入要查询的城市编号:");
        String city = sc.next();
        findUserPosition(phoneNum,city);


    }

    public static void findUserPosition(String number,String city) throws Exception{
        //将表封装成一个TableName对象
        TableName tn = TableName.valueOf("dianxin_tb1");

        //获取表对象
        Table table = conn.getTable(tn);

        //scan
        Scan scan = new Scan();
        //创建一个行键前缀过滤器对象
        //public PrefixFilter(byte[] prefix)
        PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(number));

        //过滤城市
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("city"),
                CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(city)));

        //创建过滤器集合
        FilterList filterList = new FilterList();
        filterList.addFilter(prefixFilter);
        filterList.addFilter(singleColumnValueFilter);

        scan.setFilter(filterList);
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            String rk = Bytes.toString(result.getRow());
            //47BE1E866CFC071DB19D5E1C056BE28AE24C16E7-9496768070
            String[] strings = rk.split("-");
            String phoneNum = strings[0];
            long startTime = 20190000000000L - Long.parseLong(strings[1]);
            String wg = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("wg")));
            String city2 = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("city")));
            String qx = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("qx")));
            String stayTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("stayTime")));
            String endTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("endTime")));
            String date = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("date")));

            System.out.println("手机号:"+phoneNum+",进入网格时间:"+startTime+",离开网格时间:"+endTime
                    +",网格编号:"+wg+",城市:"
                    +city2+",区县:"+qx+",停留时间:"
                    +stayTime+",日期:"+date);
        }

    }

    //查询函数
    public static void findUser(String number) throws Exception{
        //将表封装成一个TableName对象
        TableName tn = TableName.valueOf("dianxin_tb1");

        //获取表对象
        Table table = conn.getTable(tn);

        //scan
        Scan scan = new Scan();
        //创建一个行键前缀过滤器对象
        //public PrefixFilter(byte[] prefix)
        PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(number));
        //设置过滤器
        //public Scan setFilter(Filter filter)
        scan.setFilter(prefixFilter);
        scan.setLimit(10);
        //创建结果对象
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            String rk = Bytes.toString(result.getRow());
            //47BE1E866CFC071DB19D5E1C056BE28AE24C16E7-9496768070
            String[] strings = rk.split("-");
            String phoneNum = strings[0];
            long startTime = 20190000000000L - Long.parseLong(strings[1]);
            String wg = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("wg")));
            String city = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("city")));
            String qx = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("qx")));
            String stayTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("stayTime")));
            String endTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("endTime")));
            String date = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("date")));

            System.out.println("手机号:"+phoneNum+",进入网格时间:"+startTime+",离开网格时间:"+endTime
                    +",网格编号:"+wg+",城市:"
                    +city+",区县:"+qx+",停留时间:"
                    +stayTime+",日期:"+date);
        }

    }



    //添加函数
    public static void putsData() throws Exception{


        //将表封装成一个TableName对象
        TableName tn = TableName.valueOf("dianxin_tb1");

        //获取表对象
        Table table = conn.getTable(tn);

        //获取IO对象
        BufferedReader br = new BufferedReader(new FileReader("hbase/data/dianxin_data"));
        String line=null;
        while ((line= br.readLine())!=null){
            if (!(line.contains("shushan")) && !(line.contains("\\N"))){
                String[] strings = line.split("\t");
                String phoneNum = strings[0];
                String wg = strings[1];
                String city = strings[2];
                String qx = strings[3];
                String stayTime = strings[4];
                String startTime = strings[5];
                String endTime = strings[6];
                String date = strings[7];

                //我们为了需求的查询更加方便,以手机号与进入网格的时间进行拼接
                //因为要查询最近的,可以使用时间戳反转对rowkey进行设计
                //20180503173254
                //20190000000000
                long new_time = 20190000000000L - Long.parseLong(startTime);
                String rk = phoneNum + "-" + new_time;
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","wg",wg);
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","city",city);
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","qx",qx);
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","stayTime",stayTime);
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","endTime",endTime);
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","date",date);


            }

        }


    }
}

六 二级索引

6.1相关概念

二级索引的本质就是建立各列值与行键之间的映射关系

Hbase的局限性:

  HBase本身只提供基于行键和全表扫描的查询,而行键索引单一,对于多维度的查询困难。

6.2 常见的二级索引

HBase的一级索引就是rowkey,我们只能通过rowkey进行检索。如果我们相对hbase里面列族的列列进行一些组合查询,就需要采用HBase的二级索引方案来进行多条件的查询。

  1. MapReduce方案

  2. ITHBASE(Indexed-Transanctional HBase)方案

  3. IHBASE(Index HBase)方案

  4. Hbase Coprocessor(协处理器)方案

  5. Solr+hbase方案 redis+hbase 方案

  6. CCIndex(complementalclustering index)方案

6.3 二级索引的种类

1、创建单列索引

  2、同时创建多个单列索引

  3、创建联合索引(最多同时支持3个列)

  4、只根据rowkey创建索引

6.4 Mapreduce 创建二级索引

使用整合MapReduce的方式创建hbase索引。主要的流程如下:

1.1扫描输入表,使用hbase继承类TableMapper

1.2获取rowkey和指定字段名称和字段值

1.3创建Put实例, value=” “, rowkey=班级,column=学号

1.4使用TableReducer将数据写入索引表

首先二级索引表必须先要存在

该Map继承的是TableMapper

该Reducer继承的是TableReducer

代码如下

package com.shujia.jinjie;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//因为我们现在要读取的数据来自于hbase中的hfile文件,与hdfs上普通的block块文件有所区别,不能直接继承Mapper类
//要继承hbase读取数据专属的Mapper类     TableMapper
//public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT>

class MyIndexMapper extends TableMapper<Text,NullWritable>{
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        //ImmutableBytesWritable key --相当于是读取到一行的行键
        //Result value --相当于读取到一行多列的封装
        //获取行键,

        String id = Bytes.toString(key.get());
        //获取姓名列
        // public byte[] getValue(byte[] family, byte[] qualifier)
        String name = Bytes.toString(value.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
        //将姓名与学号连接起来给到reduce
        context.write(new Text(id+"-"+name),NullWritable.get());


    }
}


//public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation>
class MyIndexReducer extends TableReducer<Text,NullWritable,NullWritable>{
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
        String string = key.toString();
        String id = string.split("-")[0];
        String name = string.split("-")[1];

        //将要添加的数据封装成Put类的对象
        Put put = new Put(Bytes.toBytes(name));
        //public Put addImmutable(byte[] family, byte[] qualifier, byte[] value)
        put.addColumn(Bytes.toBytes("info"),Bytes.toBytes(id),Bytes.toBytes(""));
        context.write(NullWritable.get(),put);

    }
}

public class HBaseIndexDemo1 {
    public static void main(String[] args) throws Exception {
        //获取集群配置文件,可以是hdfs集群也可以是hbase集群
        Configuration conf = HBaseConfiguration.create();
        //设置zookeeper集群信息
        conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
        //创建job对象
        Job job = Job.getInstance(conf);
        //给job任务取名字
        job.setJobName("给学生表创建二级索引表");

        //设置主类
        job.setJarByClass(HBaseIndexDemo1.class);

        //因为索引表的构建是建立列值与行键的映射关系,要获取所有的数据
        //scan扫描全表数据
        Scan scan = new Scan();
        //告诉输入的列值来自于哪一个列簇
        scan.addFamily(Bytes.toBytes("info"));




        /**
        * @param table  The table name to read from.
                * @param scan  The scan instance with the columns, time range etc.
         * @param mapper  The mapper class to use.
         * @param outputKeyClass  The class of the output key.
         * @param outputValueClass  The class of the output value.
         * @param job  The current job to adjust.  Make sure the passed job is
         * carrying all necessary HBase configuration.
                * @throws IOException When setting up the details fails.
                *public static void initTableMapperJob
         * (String table,Scan scan,Class<? extends TableMapper> mapper,Class<?> outputKeyClass,Class<?> outputValueClass,Job job)
         */
        //初始化Map任务
        TableMapReduceUtil.initTableMapperJob("students2",scan,MyIndexMapper.class, Text.class, NullWritable.class,job);
        //初始化Reduce任务
        TableMapReduceUtil.initTableReducerJob("students2_index",MyIndexReducer.class,job);
        //提交作业到集群中允许
        boolean b = job.waitForCompletion(true);
        if (b) {
            System.out.println("================== students2索引表构建成功!!!============================");
        } else {
            System.out.println("================== students2索引表构建失败!!!============================");
        }

    }
}

6.5 Redis创建二级索引

1.指定redis端口号

Jedis jedis = new Jedis("192.168.73.100", 12346);

2.在redis中构建映射关系(性别:学号)因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储

    public static void buildIndexInRedis() throws Exception {

        //获取要构建索引的原表
        Table students2 = conn.getTable(TableName.valueOf("students2"));

        Scan scan = new Scan();
        //获取男生的学号,放入到redis中
        //创建列值过滤器
        ValueFilter filter1 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("男")));
        scan.setFilter(filter1);
        ResultScanner resultScanner = students2.getScanner(scan);
        for (Result result : resultScanner) {
            //获取每一行的行键即可
            String id = Bytes.toString(result.getRow());
            //将学号以值的方式添加到redis键对应的值中
            //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
            jedis.sadd("性别:男", id);
        }

        //获取男生的学号,放入到redis中
        //创建列值过滤器
        ValueFilter filter2 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("女")));
        scan.setFilter(filter2);
        ResultScanner resultScanner2 = students2.getScanner(scan);
        for (Result result : resultScanner2) {
            //获取每一行的行键即可
            String id = Bytes.toString(result.getRow());
            //将学号以值的方式添加到redis键对应的值中
            //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
            jedis.sadd("性别:女", id);
        }

3.先通过查询redis中性别对应的学号,拿着学号去hbase原表中查询获取结果

package com.shujia.jinjie;

import com.shujia.utils.HBaseUtils;
import com.shujia.utils.HBaseUtils;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import redis.clients.jedis.Jedis;

import java.util.Scanner;
import java.util.Set;

/*
    使用redis第三方的存储工具存储hbase索引(本质依旧是列值与行键产生映射关系)
 */
public class HBaseWithRedisIndex {
    //1、获取hbase数据库连接对象和操作对象
    static Connection conn = HBaseUtils.CONNECTION;
    static Admin admin = HBaseUtils.ADMIN;

    //获取redis连接对象
    static Jedis jedis = new Jedis("192.168.73.100", 12346);

    public static void main(String[] args) throws Exception {
        //步骤1:在redis中构建映射关系(性别:学号)
//        buildIndexInRedis();

      //使用:先通过查询redis中性别对应的学号,拿着学号去hbase原表中查询获取结果
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入您要查询的性别:");
        String gender = sc.next();
        selectGenderFromHbase(gender);
    }

    public static void selectGenderFromHbase(String gender) throws Exception {
        if ("男".equals(gender)) {
            selectIdFromRedis(gender);
        } else if ("女".equals(gender)) {
            selectIdFromRedis(gender);
        } else {
            System.out.println("没有该性别");
        }
    }

    //单独编写一个方法查询redis
    public static void selectIdFromRedis(String gender) throws Exception {
        Table students2 = conn.getTable(TableName.valueOf("students2"));
        //redis中set查询值的方法
        Set<String> ids = jedis.smembers("性别:"+gender);
        for (String id : ids) {
//            Result result = students2.get(new Get(Bytes.toBytes(id)).addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")));
            Get get = new Get(Bytes.toBytes(id));
            //获取那一列的所有信息封装成一个结果对象
            get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
            Result result = students2.get(get);

            String name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
            System.out.println("学号:" + id + ",姓名:" + name);
        }
    }

    public static void buildIndexInRedis() throws Exception {

        //获取要构建索引的原表
        Table students2 = conn.getTable(TableName.valueOf("students2"));

        Scan scan = new Scan();
        //获取男生的学号,放入到redis中
        //创建列值过滤器
        ValueFilter filter1 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("男")));
        scan.setFilter(filter1);
        ResultScanner resultScanner = students2.getScanner(scan);
        for (Result result : resultScanner) {
            //获取每一行的行键即可
            String id = Bytes.toString(result.getRow());
            //将学号以值的方式添加到redis键对应的值中
            //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
            jedis.sadd("性别:男", id);
        }

        //获取男生的学号,放入到redis中
        //创建列值过滤器
        ValueFilter filter2 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("女")));
        scan.setFilter(filter2);
        ResultScanner resultScanner2 = students2.getScanner(scan);
        for (Result result : resultScanner2) {
            //获取每一行的行键即可
            String id = Bytes.toString(result.getRow());
            //将学号以值的方式添加到redis键对应的值中
            //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
            jedis.sadd("性别:女", id);
        }


    }
}

6.6 Phoenix二级索引

对于Hbase,如果想精确定位到某行记录,唯一的办法就是通过rowkey查询。如果不通过rowkey查找数据,就必须逐行比较每一行的值,对于较大的表,全表扫描的代价是不可接受的。

1.开启索引,配置相关文件

# 关闭hbase集群
stop-hbase.sh

# 在/usr/local/soft/hbase-1.4.6/conf/hbase-site.xml中增加如下配置

<property>
  <name>hbase.regionserver.wal.codec</name>
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>


# 同步到所有节点
scp hbase-site.xml node1:`pwd`
scp hbase-site.xml node2:`pwd`

# 修改phoenix目录下的bin目录中的hbase-site.xml
<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>


# 启动hbase
start-hbase.sh
# 重新进入phoenix客户端
sqlline.py master,node1,node2

6.6.1 全局索引

全局索引适合读多写少的场景。如果使用全局索引,读数据基本不损耗性能,所有的性能损耗都来源于写数据。数据表的添加、删除和修改都会更新相关的索引表(数据删除了,索引表中的数据也会删除;数据增加了,索引表的数据也会增加)

注意: 对于全局索引在默认情况下,在查询语句中检索的列如果不在索引表中,Phoenix不会使用索引表将,除非使用hint。

1.导入数据

首先创建一个shell命令,里面有建表语句,取名为DIANXIN.sql

CREATE TABLE IF NOT EXISTS DIANXIN (
     mdn VARCHAR ,
     start_date VARCHAR ,
     end_date VARCHAR ,
     county VARCHAR,
     x DOUBLE ,
     y  DOUBLE,
     bsid VARCHAR,
     grid_id  VARCHAR,
     biz_type VARCHAR, 
     event_type VARCHAR , 
     data_source VARCHAR ,
     CONSTRAINT PK PRIMARY KEY (mdn,start_date)
) column_encoded_bytes=0;

2.准备数据

DIANXIN.csv

3.输入命令

# 导入数据,Linux命令
psql.py master,node1,node2 DIANXIN.sql DIANXIN.csv

4.导入数据好的截图

5.创建全局索引
CREATE INDEX DIANXIN_INDEX ON DIANXIN ( end_date );

6 查询数据 ( 索引未生效)
select * from DIANXIN where end_date = '20180503154014';

很明显查询需要2秒多,

7  强制使用索引 (索引生效) hint  语法糖

语法糖:/*+ INDEX(DIANXIN DIANXIN_INDEX) */
select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = '20180503154014';

很明显查询零点几秒,加了语法糖,查询速度加快

8 取索引列,(索引生效)

取我们设置的单级索引列,速度一样快
select end_date from DIANXIN where end_date = '20180503154014';

9.#创建多列索引
CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY );

取我们多级索引的某一个索引列单独查询速度很快的

10 查询所有列 (索引未生效)
select  * from DIANXIN where end_date = '20180503154014'  and COUNTY = '8340104';

这个语法糖写错了,就当没有写,查询需要1秒多

11 查询所有列 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX1) */ * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';

加了语法糖,只要零点几秒

12  单条件  (索引未生效)
select end_date from DIANXIN where  COUNTY = '8340103';
# 单条件  (索引生效) end_date 在前
select COUNTY from DIANXIN where end_date = '20180503154014';

因为创建多级索引是 CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY )

end_date 在前面

13 删除索引
drop index 索引名 on DIANXIN;
drop index DIANXIN_INDEX on DIANXIN;

6.6.2 本地索引

1.相关概念

本地索引适合写多读少的场景,或者存储空间有限的场景。和全局索引一样,Phoenix也会在查询的时候自动选择是否使用本地索引。本地索引因为索引数据和原数据存储在同一台机器上,避免网络数据传输的开销,所以更适合写多的场景。由于无法提前确定数据在哪个Region上,所以在读数据的时候,需要检查每个Region上的数据从而带来一些性能损耗。

注意:对于本地索引,查询中无论是否指定hint或者是查询的列是否都在索引表中,都会使用索引表。

 2.创建本地索引
CREATE LOCAL INDEX DIANXIN_LOCAL_IDEX ON DIANXIN(grid_id);

3. 索引生效
select grid_id from dianxin where grid_id='117285031820040';

索引生效
select * from dianxin where grid_id='117285031820040';

6.6.3 覆盖索引

1.相关概念

覆盖索引是把原数据存储在索引数据表中,这样在查询时不需要再去HBase的原表获取数据就,直接返回查询结果。

注意:查询是 select 的列和 where 的列都需要在索引中出现。

2.创建覆盖索引
CREATE INDEX DIANXIN_INDEX_COVER ON DIANXIN ( x,y ) INCLUDE ( county );

3.

# 查询所有列 (索引未生效)
select * from DIANXIN where x=117.288 and y =31.822;

# 强制使用索引 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ * from DIANXIN where x=117.288 and y =31.822;

# 查询索引中的列 (索引生效) mdn是DIANXIN表的RowKey中的一部分
select x,y,county from DIANXIN where x=117.288 and y =31.822;
select mdn,x,y,county from DIANXIN where x=117.288 and y =31.822;

# 查询条件必须放在索引中  select 中的列可以放在INCLUDE (将数据保存在索引中)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ x,y,count(*) from DIANXIN group by x,y;

6.7 Phoenix JCBC

1.导入依赖

        <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-hbase-compat-2.2.5 -->
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-hbase-compat-2.2.5</artifactId>
            <version>5.1.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core -->
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-core</artifactId>
            <version>5.1.3</version>
        </dependency>

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>

代码

package com.shujia.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class PhoenixJDBC {
    public static void main(String[] args) throws Exception {
        //注册驱动
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");

        //创建与phoenix连接对象
        Connection conn = DriverManager.getConnection("jdbc:phoenix:master,node1,node2:2181");

        PreparedStatement prep = conn.prepareStatement("select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = ? and start_date = ?");

        prep.setString(1,"20180503154014");
        prep.setString(2,"20180503154614");

        ResultSet resultSet = prep.executeQuery();
        while (resultSet.next()){
            /*
                 mdn VARCHAR ,
                 start_date VARCHAR ,
                 end_date VARCHAR ,
                 county VARCHAR,
                 x DOUBLE ,
                 y  DOUBLE,
                 bsid VARCHAR,
                 grid_id  VARCHAR,
                 biz_type VARCHAR,
                 event_type VARCHAR ,
                 data_source VARCHAR ,
             */
            String mdn = resultSet.getString("mdn");
            String start_date = resultSet.getString("start_date");
            String end_date = resultSet.getString("end_date");
            String county = resultSet.getString("county");
            double x = resultSet.getDouble("x");
            double  y = resultSet.getDouble("y");
            String bsid = resultSet.getString("bsid");
            String grid_id = resultSet.getString("grid_id");
            String biz_type = resultSet.getString("biz_type");
            String event_type = resultSet.getString("event_type");
            String data_source = resultSet.getString("data_source");

            System.out.println(mdn+","+start_date+","+end_date+","+county+","+x+","+y+","+bsid
                    +","+grid_id+","+biz_type+","+event_type+","+data_source);
        }

        //释放资源
        prep.close();
        conn.close();

    }
}

七 HBase的调优

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/588778.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言:循环结构

循环结构 1. for循环概念举例示例结果分析 补充 2. while循环概念举例示例结果分析补充 3. do-while循环概念举例示例结果分析 补充 4.循环控制举例示例结果分析 C语言中的循环结构是一种重要的编程构造&#xff0c;它允许我们重复执行一段代码&#xff0c;直到满足某个条件为止…

Hive优化以及相关参数设置

1.表层面设计优化 1.1 表分区 分区表实际上就是对应一个 HDFS 文件系统上的独立的文件夹&#xff0c;该文件夹下是该分区所有的数据文件。Hive 中的分区就是分目录&#xff0c;把一个大的数据集根据业务需要分割成小的数据集。在查询时通过 WHERE 子句中的表达式选择查询所需要…

Angular基础-搭建Angular运行环境

这篇文章介绍了在Angular项目中进行开发环境搭建的关键步骤。包括node.js安装和配置、安装Angular CLI工具、安装angular-router、创建Angular项目等步骤。这篇文章为读者提供了清晰的指南&#xff0c;帮助他们快速搭建Angular开发环境&#xff0c;为后续的项目开发奠定基础。 …

Python中动画显示与gif生成

1. 动画生成 主要使用的是 matplotlib.animation &#xff0c;具体示例如下&#xff1a; import matplotlib.pyplot as plt import matplotlib.animation as animation import numpy as np fig, ax plt.subplots() t np.linspace(0, 3, 40) g -9.81 v0 12 z g * t**2 / …

【项目学习01_2024.05.02_Day04】

学习笔记 4 课程分类查询4.1需求分析4.2 接口定义4.3 接口开发4.3.1 树型表查询4.3.2 开发Mapper 4 课程分类查询 4.1需求分析 有课程分类的需求 course_category课程分类表的结构 这张表是一个树型结构&#xff0c;通过父结点id将各元素组成一个树。 利用mybatis-plus-gen…

第十五届蓝桥杯Java软件开发大学B组自我经验小结

自我介绍 23届大一 双非 计院科班 软件工程 江苏人在吉林上大学 Java蒟蒻 在学校的宣传下 有幸参加了第十五届蓝桥杯Java大学b组省赛 蓝桥杯说明 就是一个算法比赛吧 考试时间9.00到1.00 四小时 带准考证和身份证和笔 草稿纸会发 赛制是IOC就是不会给任何反馈 就是你…

IDEA 创建Servlet-HelloWorldServlet

servlet 1.创建空项目2.配置web项目3.配置Tomcat4.加载Tomcat包5.创建HelloWorldServlet类6.配置web.xml7.运行get与post请求 1.创建空项目 2.配置web项目 3.配置Tomcat 4.加载Tomcat包 5.创建HelloWorldServlet类 public class controller extends HttpServlet {Override//get…

java入门-包装类

包装类 Java语言是一个面向对象的语言&#xff0c;但是Java中的基本数据类型却是不面向对象的。基本类型的数据不具备"对象"的特性&#xff08;没有属性和方法可以调用&#xff09;&#xff0c;因此&#xff0c;java为每种数据类型分别设计了对应的类&#xff0c;即*…

《与 Apollo 共创生态——Apollo7周年大会干货分享》

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” 文章目录 阿波罗X企业自动驾驶解决方案自动驾驶技术提升与挑战自动驾驶系统功能与性能的详细解析<td alig…

在智慧城市的建设中智能车载监控录像机发挥什么作用

引言 随着科技的快速发展&#xff0c;智慧城市的概念逐渐深入人心。在智慧城市的建设中&#xff0c;智能车载监控录像机作为一种重要的技术工具&#xff0c;发挥着越来越重要的作用。本文将从多个方面探讨智能车载监控录像机在智慧城市建设中的作用。 一、智能车载监控录像机概…

纯血鸿蒙APP实战开发——主页瀑布流实现

介绍 本示例介绍使用ArkUIWaterFlow组件和LazyForEach实现瀑布流场景。该场景多用于购物、资讯类应用。 效果图预览 使用说明 加载完成后显示整个列表&#xff0c;超过一屏时可以上下滑动。 实现思路 创建WaterFlowDataSource类&#xff0c;实现IDataSource接口的对象&…

C语言/数据结构——每日一题(链表的中间节点)

一.前言 今天我在LeetCode刷到了一道单链表题&#xff0c;想着和大家分享一下这道题&#xff1a;https://leetcode.cn/problems/middle-of-the-linked-list。废话不多说让我们开始今天的知识分享吧。 二.正文 1.1题目描述 1.2题目分析 这道题有一个非常简便的方法——快慢指…

Vue+Element UI el-progress进度条内显示自定义数字及文字

需求 进度条内展示 具体的数字值&#xff0c;进度条外展示 百分比数值 数据 data() {return {reNum: 3214,rePer:40,warmPer: 40,warmNum:2132,}}因为样式要求&#xff0c;显示的百分数也是自己写的哈 &#xff0c;没有用进度条自带的 代码 <div class"pick"&g…

2024五一杯数学建模A题思路分析-钢板最优切割路径问题

文章目录 1 赛题选题分析 2 解题思路3 最新思路更新 1 赛题 A题 钢板最优切割路径问题 提高钢板下料切割过程中的工作效率&#xff0c;是模具加工企业降低成本和增加经济效益的重要途径&#xff0c;其中钢板切割的路径规划是钢板切割过程的一个关键环节。 钢板切割就是使用特殊…

【C++】哈希的应用---布隆过滤器

目录 1、引入 2、布隆过滤器概念 3、选择哈希函数个数和布隆过滤器长度 4、布隆过滤器的实现 ①框架的搭建 ②设置存在 ③检查存在 ④不支持 reset 5、布隆过滤器计算误差 6、布隆过滤器的优缺点 ①布隆过滤器优点 ②布隆过滤器缺陷 7、布隆过滤器的实际应用 8、完…

KUKA机器人KR3 R540维护保养——涂润滑脂

KUKA机器人在保养时少不了润滑脂&#xff0c;不同型号的机器人需要的润滑脂类型也不一样&#xff0c;保养时注意选用合适的润滑脂。本篇文章以KUKA机器人KR3 R540为例&#xff0c;在轴盖板 A2、A3、A5 的内侧涂上润滑脂。 一、涂润滑脂的作用 拆开机器人一个轴的盖板&am…

Android Kernel源码下载方法

Android Kernel的源码是git管理的&#xff0c;和之前下载的Android源码管理方式不一样&#xff0c;所以下载方式也不一样&#xff0c;直接用git下载就可以了&#xff1b;去网上搜的下载方式五花八门&#xff0c;有很多问题&#xff0c;因为服务器经常无法访问&#xff0c;也一直…

质谱原理与仪器3-笔记

质谱原理与仪器3-笔记 一、质量分析器类型1、聚焦磁场分析器&#xff1a;A、单聚焦磁场分析器B、双聚焦磁场分析器 2、四极杆质量分析器3、飞行时间质谱仪(Time of Flight MS, TOF-MS)4、离子阱质量分析器 二、质谱仪的主要性能指标1、质量范围(mass range)2、分辨率(resolutio…

C语言编译的优化等级应该选哪个?O0、O1、O2还是O3

在使用IDE开发STM32程序时&#xff0c;IDE一般都会提供优化等级设置的选项&#xff0c;例如下图中KEIL软件优化等级的设置。 从上图中也可以看出&#xff0c;设置不同的优化等级&#xff0c;实际上是修改了编译器的编译参数。这个编译器是由ARM公司提供的C/C编译器armclang或者…

家里挂宗教画是否合适?

凡在我们这改名取名的客户&#xff0c;峰民都会建议他们挂一副这样或那样的画&#xff0c;这也是根据你命中的五行来给你助运的。而现在许多人都有自己的宗教信仰&#xff0c;有些人会在家供奉佛像&#xff0c;有些人甚至会在家里挂上宗教画&#xff0c;亦同样产生五行效应&…