package com.df.tools
import java.util.concurrent.atomic.AtomicInteger
import com.df.Contant.GlobalConfigUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes, MD5Hash}
import org.apache.hadoop.hbase.util.RegionSplitter.HexStringSplit
/**
* Created by angel
*/
object HbaseTools {
private val config: Configuration = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem)
config.set("hbase.master" , GlobalConfigUtils.hbaseMaster)
config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort)
config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
private val conn: Connection = ConnectionFactory.createConnection(config)
private val admin: Admin = conn.getAdmin
val atomic = new AtomicInteger(0)
var resultAtomic = 0
/**
* @return 构建表的连接
* */
def Init(tableName: String , columnFamily:String):Table = {
val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName))
val hColumnDescriptor = new HColumnDescriptor(columnFamily)
hTableDescriptor.addFamily(hColumnDescriptor)
if(!admin.tableExists(TableName.valueOf(tableName))){
// admin.createTable(hTableDescriptor)
createHTable(conn , tableName , 10 , Array(columnFamily))
}
conn.getTable(TableName.valueOf(tableName))
}
// 对指定的列构造rowKey,采用Hash前缀拼接业务主键的方法
def rowKeyWithHashPrefix(column: String*): Array[Byte] = {
val rkString = column.mkString("")
val hash_prefix = getHashCode(rkString)
val rowKey = Bytes.add(Bytes.toBytes(hash_prefix), Bytes.toBytes(rkString))
rowKey
}
// 对指定的列构造rowKey, 采用Md5 前缀拼接业务主键方法,主要目的是建表时采用MD5 前缀进行预分区
def rowKeyWithMD5Prefix(separator:String,length: Int,column: String*): Array[Byte] = {
val columns = column.mkString(separator)
var md5_prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(columns))
if (length < 8){
md5_prefix = md5_prefix.substring(0, 8)
}else if (length >= 8 || length <= 32){
md5_prefix = md5_prefix.substring(0, length)
}
val row = Array(md5_prefix,columns)
val rowKey = Bytes.toBytes(row.mkString(separator))
rowKey
}
// 对指定的列构造RowKey,采用MD5方法
def rowKeyByMD5(column: String*): Array[Byte] = {
val rkString = column.mkString("")
val md5 = MD5Hash.getMD5AsHex(Bytes.toBytes(rkString))
val rowKey = Bytes.toBytes(md5)
rowKey
}
// 直接拼接业务主键构造rowKey
def rowKey(column:String*):Array[Byte] = Bytes.toBytes(column.mkString(""))
// Hash 前缀的方法:指定列拼接之后与最大的Short值做 & 运算
// 目的是预分区,尽量保证数据均匀分布
private def getHashCode(field: String): Short ={
(field.hashCode() & 0x7FFF).toShort
}
/**
* @param tablename 表名
* @param regionNum 预分区数量
* @param columns 列簇数组
*/
def createHTable(connection: Connection, tablename: String,regionNum: Int, columns: Array[String]): Unit = {
val nameSpace = "df"
val hexsplit: HexStringSplit = new HexStringSplit()
// 预先构建分区,指定分区的start key
val splitkeys: Array[Array[Byte]] = hexsplit.split(regionNum)
val admin = connection.getAdmin
val tableName = TableName.valueOf(tablename)
if (!admin.tableExists(tableName)) {
val tableDescriptor = new HTableDescriptor(tableName)
if (columns != null) {
columns.foreach(c => {
val hcd = new HColumnDescriptor(c.getBytes()) //设置列簇
hcd.setMaxVersions(1)
hcd.setCompressionType(Algorithm.SNAPPY) //设定数据存储的压缩类型.默认无压缩(NONE)
tableDescriptor.addFamily(hcd)
})
}
admin.createTable(tableDescriptor,splitkeys)
}
}
/**
* @param tableName
* @param key
* @param columnFamily
* @param column
* @param data 要落地的数据
* */
def putData(tableName: String , key:String , columnFamily:String , column:String , data:String):Int = {
val table: Table = Init(tableName , columnFamily)
try{
val rowkey = HbaseTools.rowKeyByMD5(key)
val put: Put = new Put(rowkey)
put.addColumn(Bytes.toBytes(columnFamily) ,Bytes.toBytes(column.toString) , Bytes.toBytes(data.toString))
table.put(put)
resultAtomic = atomic.incrementAndGet()
}catch{
case e:Exception => e.printStackTrace()
resultAtomic = atomic.decrementAndGet()
}finally {
table.close()
}
resultAtomic
}
/**
* @param mapData 要插入的数据[列明 , 值]
* */
def putMapData(tableName: String , columnFamily:String, key:String , mapData:Map[String , String]):Int = {
val table: Table = Init(tableName , columnFamily)
try{
//TODO rowKeyWithMD5Prefix
val rowkey = HbaseTools.rowKeyByMD5(key)
val put: Put = new Put(rowkey)
if(mapData.size > 0){
for((k , v) <- mapData){
put.addColumn(Bytes.toBytes(columnFamily) ,Bytes.toBytes(k.toString) , Bytes.toBytes(v.toString))
}
}
table.put(put)
resultAtomic = atomic.incrementAndGet()
}catch{
case e:Exception => e.printStackTrace()
resultAtomic = atomic.decrementAndGet()
}finally {
table.close()
}
resultAtomic
}
def deleteData(tableName: String , rowKey:String , columnFamily:String):Int ={
val table: Table = Init(tableName , columnFamily)
try{
val delete = new Delete(Bytes.toBytes(rowKey))
table.delete(delete)
resultAtomic = atomic.decrementAndGet()
}catch{
case e:Exception => e.printStackTrace()
resultAtomic = atomic.decrementAndGet()
}finally {
table.close()
}
resultAtomic
}
def convertScanToString(scan: Scan):String={
val proto = ProtobufUtil.toScan(scan)
return Base64.encodeBytes(proto.toByteArray)
}
}