Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 770|回复: 0

关于Hbase的预分区,解决热点问题

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-7-7 08:09:55 | 显示全部楼层 |阅读模式

    Hbase默认建表是只有一个分区的,开始的时候所有的数据都会查询这个分区,当这个分区达到一定大小的时候,就会进行做split操作;

    因此为了确保regionserver的稳定和高效,应该尽量避免region分裂和热点的问题;

    那么有的同学在做预分区的时候,可能是按照:

    1):

    通过Hbase提供的api:
    bin/hbase org.apache.hadoop.hbase.util.RegionSplitter demo1 HexStringSplit -c 10 -f info
    
    默认建表是没有开启Bloomfilter和压缩参数的,这里为了提供读性能,建议开启Bloomfilter,同时使用压缩SNAPPY,进入hbase shell,首先需要disable 'poidb',然后使用使用 
    
    alter 'poidb',{NAME => 'info',BLOOMFILTER => 'ROWCOL',COMPRESSION => 'SNAPPY',VERSIONS => '1'} 
    
    -C 多少个分区
    -f 列族

    2):

    通过指定create命令

    3):

    没做任何修饰的代码操作

    package com.dongfeng.code.tools.writeDb
    
    import com.dongfeng.code.tools.GlobalConfigUtils
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
    import org.apache.hadoop.hbase.client.{Admin, Connection, ConnectionFactory}
    import org.apache.hadoop.hbase.util.Bytes
    
    /**
      * Created by angel
      */
    object WriteToHbaseDB {
      private val config: Configuration = HBaseConfiguration.create()
      config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
      config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
      config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
      //def scannTimeout = conf.getString("c")
      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
      private val conn: Connection = ConnectionFactory.createConnection(config)
      private val admin: Admin = conn.getAdmin
      //创建表
      def createTable(tableName:TableName, columnFamily:String) = {
    
        val hTableDescriptor = new HTableDescriptor(tableName)
        val hColumnDescriptor = new HColumnDescriptor(columnFamily)
        hTableDescriptor.addFamily(hColumnDescriptor)
        //如果表不存在则创建表
        if(!admin.tableExists(tableName)){
          var splitKeys: List[Array[Byte]] = List(
            Bytes.toBytes("40000") ,
            Bytes.toBytes("80000") ,
            Bytes.toBytes("120000") ,
            Bytes.toBytes("160000")
          )
    //      for (x <- 1 to 5) {
    //        if(x<10){
    //          splitKeys = splitKeys.+:(Bytes.toBytes(x.toString))
    //        }else{
    //          splitKeys = splitKeys.+:(Bytes.toBytes(x.toString))
    //        }
    //      }
          try{
            //创建表
            admin.createTable(hTableDescriptor, splitKeys.toArray)
          }finally {
            admin.close()
          }
        }
      }
    
      def main(args: Array[String]): Unit = {
        createTable(TableName.valueOf("demo3") , "info")
      }
    }
    View Code

     

    其实上面的这些操作,会无形中限制我们的rowkey的最初设计,既要考虑高效的字典排列方式,还要考虑热点问题。往往稍微有点偏差,就会出现大部分的数据都往一个region中跑,显然不合理

    因此,我觉得至少在我的业务中是需要进行rowkey的加盐或者MD5操作的,达到rowkey的散列

    我这里进行MD5加密处理

    package com.df.tools
    
    import java.util.concurrent.atomic.AtomicInteger
    
    import com.df.Contant.GlobalConfigUtils
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase._
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes, MD5Hash}
    import org.apache.hadoop.hbase.util.RegionSplitter.HexStringSplit
    
    /**
      * Created by angel
      */
    object HbaseTools {
      private val config: Configuration = HBaseConfiguration.create()
      config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
      config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
      config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
      private val conn: Connection = ConnectionFactory.createConnection(config)
      private val admin: Admin = conn.getAdmin
      val atomic = new AtomicInteger(0)
      var resultAtomic = 0
      /**
        * @return 构建表的连接
        * */
      def Init(tableName: String , columnFamily:String):Table = {
        val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
        val hColumnDescriptor = new HColumnDescriptor(columnFamily)
        hTableDescriptor.addFamily(hColumnDescriptor)
        if(!admin.tableExists(TableName.valueOf(tableName))){
    //      admin.createTable(hTableDescriptor)
          createHTable(conn , tableName , 10 , Array(columnFamily))
    
    
        }
        conn.getTable(TableName.valueOf(tableName))
      }
    
      // 对指定的列构造rowKey,采用Hash前缀拼接业务主键的方法
      def rowKeyWithHashPrefix(column: String*): Array[Byte] = {
        val rkString = column.mkString("")
        val hash_prefix = getHashCode(rkString)
        val rowKey = Bytes.add(Bytes.toBytes(hash_prefix), Bytes.toBytes(rkString))
        rowKey
      }
    
      // 对指定的列构造rowKey, 采用Md5 前缀拼接业务主键方法,主要目的是建表时采用MD5 前缀进行预分区
      def rowKeyWithMD5Prefix(separator:String,length: Int,column: String*): Array[Byte] = {
        val columns = column.mkString(separator)
    
        var md5_prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(columns))
        if (length < 8){
          md5_prefix = md5_prefix.substring(0, 8)
        }else if (length >= 8 || length <= 32){
          md5_prefix = md5_prefix.substring(0, length)
        }
        val row = Array(md5_prefix,columns)
        val rowKey = Bytes.toBytes(row.mkString(separator))
        rowKey
      }
    
      // 对指定的列构造RowKey,采用MD5方法
      def rowKeyByMD5(column: String*): Array[Byte] = {
        val rkString = column.mkString("")
        val md5 = MD5Hash.getMD5AsHex(Bytes.toBytes(rkString))
        val rowKey = Bytes.toBytes(md5)
        rowKey
      }
      // 直接拼接业务主键构造rowKey
      def rowKey(column:String*):Array[Byte] = Bytes.toBytes(column.mkString(""))
    
      // Hash 前缀的方法:指定列拼接之后与最大的Short值做 & 运算
      // 目的是预分区,尽量保证数据均匀分布
      private def getHashCode(field: String): Short ={
        (field.hashCode() & 0x7FFF).toShort
      }
    
      /**
        * @param tablename 表名
        * @param regionNum 预分区数量
        * @param columns 列簇数组
        */
      def createHTable(connection: Connection, tablename: String,regionNum: Int, columns: Array[String]): Unit = {
    
        val nameSpace = "df"
        val hexsplit: HexStringSplit = new HexStringSplit()
        // 预先构建分区,指定分区的start key
        val splitkeys: Array[Array[Byte]] = hexsplit.split(regionNum)
    
        val admin = connection.getAdmin
    
        val tableName = TableName.valueOf(tablename)
    
        if (!admin.tableExists(tableName)) {
          val tableDescriptor = new HTableDescriptor(tableName)
    
          if (columns != null) {
            columns.foreach(c => {
              val hcd = new HColumnDescriptor(c.getBytes()) //设置列簇
              hcd.setMaxVersions(1)
              hcd.setCompressionType(Algorithm.SNAPPY) //设定数据存储的压缩类型.默认无压缩(NONE)
              tableDescriptor.addFamily(hcd)
            })
          }
          admin.createTable(tableDescriptor,splitkeys)
        }
    
      }
    
    
      /**
        * @param tableName
        * @param key
        * @param columnFamily
        * @param column
        * @param data 要落地的数据
        * */
      def putData(tableName: String , key:String , columnFamily:String , column:String , data:String):Int = {
        val table: Table = Init(tableName , columnFamily)
        try{
          val rowkey = HbaseTools.rowKeyByMD5(key)
          val put: Put = new Put(rowkey)
          put.addColumn(Bytes.toBytes(columnFamily) ,Bytes.toBytes(column.toString) , Bytes.toBytes(data.toString))
          table.put(put)
          resultAtomic = atomic.incrementAndGet()
        }catch{
          case e:Exception => e.printStackTrace()
            resultAtomic = atomic.decrementAndGet()
        }finally {
          table.close()
        }
        resultAtomic
      }
    
      /**
        * @param mapData 要插入的数据[列明 , 值]
        * */
    
      def putMapData(tableName: String , columnFamily:String, key:String  , mapData:Map[String , String]):Int = {
        val table: Table = Init(tableName , columnFamily)
        try{
          //TODO rowKeyWithMD5Prefix
          val rowkey = HbaseTools.rowKeyByMD5(key)
          val put: Put = new Put(rowkey)
          if(mapData.size > 0){
            for((k , v) <- mapData){
              put.addColumn(Bytes.toBytes(columnFamily) ,Bytes.toBytes(k.toString) , Bytes.toBytes(v.toString))
            }
          }
          table.put(put)
          resultAtomic = atomic.incrementAndGet()
        }catch{
          case e:Exception => e.printStackTrace()
            resultAtomic = atomic.decrementAndGet()
        }finally {
          table.close()
        }
        resultAtomic
      }
    
      def deleteData(tableName: String , rowKey:String , columnFamily:String):Int ={
        val table: Table = Init(tableName , columnFamily)
        try{
          val delete = new Delete(Bytes.toBytes(rowKey))
          table.delete(delete)
          resultAtomic = atomic.decrementAndGet()
        }catch{
          case e:Exception => e.printStackTrace()
            resultAtomic = atomic.decrementAndGet()
        }finally {
          table.close()
        }
        resultAtomic
      }
    
    
      def convertScanToString(scan: Scan):String={
        val proto = ProtobufUtil.toScan(scan)
        return Base64.encodeBytes(proto.toByteArray)
      }
    }
    View Code

     

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-4-30 04:03 , Processed in 0.063809 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表