java操作HBase进行数据的添加和查询

2018-01-08 13:34:03来源:网络收集作者:纳米程序员人点击

分享

阿里云爆款

最近项目中使用了hadoop,这里记录一下使用java操作HBase的一些代码供读者参考.


这里需要提到一位大神,就是我们的老总,看了他的博客确实获益良多,这里贴出他的博客链接,各位读者可以看看,相信你肯定能学到一些有用的东西. http://my.csdn.net/yinwenjie?locationNum=0&fps=1


好了,进入今天的正题:


使用这边博文的代码必须建立在,你的hadoop环境已经建好,hbase也已经建好,如何搭建这里就不在说明了,可以自行搜索一下


首先是初始化连接hbase的HbaseTemplate :我使用的是springboot


引入需要的jar包



org.apache.hbase
hbase-client
0.96.1.1-hadoop2


org.slf4j
slf4j-log4j12




初始化HbaseTemplate


package com.economic.system.aggregation.common.utils;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
/**
 * 初始化HbaseTemplate
 * @author yhy
 *
 */
@Configuration
public class HBaseConfiguration {
  
  @Value("${hbase.zookeeper.clientPort}")
  private String clientPort;
  
  @Value("${hbase.zookeeper.quorum}")
  private String quorum;
  
  @Bean
  public HbaseTemplate getHbaseTemplate() {//这里需要写Configuration全名,是因为这个类上有一个注解@Configuration,名称相同了    org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
    configuration = org.apache.hadoop.hbase.HBaseConfiguration.create();
    configuration.set("hbase.zookeeper.property.clientPort", this.clientPort);//Hbase中zookeeper的端口号;默认是2181
    configuration.set("hbase.zookeeper.quorum", this.quorum);//hadoop的地址,这里需要在系统的host文件配置如:hadoop1,hadoop2,host文件中未:192.168.0.1 hadoop1192.168.0.2 hadoop2
    HbaseTemplate ht = new HbaseTemplate(configuration);
    return ht;
  }
}这样,就把HbaseTemplate初始化好了,就可以开始使用了


这里使用了一个service接口和一个实现类


package com.economic.system.aggregation.service;
import java.util.List;
/**
* 操作HBaseservice
* @author yhy
*
*/
public interface HBaseService {
/**
* 在HBase上面创建表
* @param tableName表名
* @param family 列族名(可以同时传入多个列族名)
* @return
*/
public boolean createTable(String tableName,String ...family);
/**
* Scan 查询所有的hbase数据
* @param tableName 表名
* @param 返回数据类型
* @return
*/
public List searchAll(String tableName,Class c);
/**
* 向表中插入数据
* @param
* @param tableName 表名
* @param family 列族
* @param rowkey
* @return
*/
public Object createPro(Object c,String tableName,String family,String rowkey);
/**
* 通过表名和rowkey获取一行数据
* @param 数据类型
* @param tableName 表名
* @param rowkey
* @return
*/
public T getOne(Class c,String tableName,String rowkey);
/**
* 查询一条记录一个column的值
* @param tableName 表名
* @param rowkey
* @param family 列族
* @param column 列
* @return
*/
public String getColumn(String tableName,String rowkey,String family,String column);
/**
* 查询开始row和结束row之间的数据
* @param 数据类型
* @param tableName 表名
* @param startRow 开始row
* @param endRow 结束row
* @return
*/
public List findByRowRange(Class c,String tableName,String startRow,String endRow);
}实现类:


package com.economic.system.aggregation.service.internal;
import java.beans.PropertyDescriptor;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.economic.system.aggregation.service.HBaseService;
/**
* HBase service实现类
*
* @author yhy
*
*/
@Service("hbaseService")
public class HBaseServiceImpl implements HBaseService {
@Autowired
private HbaseTemplate hbaseTemplate;
@SuppressWarnings({"resource", "deprecation"})
@Override
public boolean createTable(String tableName, String... column) {
HBaseAdmin admin;
try {
// 从hbaseTemplate 获取configuration对象,用来初始化admin
admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
for (int i = 0; i < column.length; i++) {
tableDescriptor.addFamily(new HColumnDescriptor(column[i]));
}
admin.createTable(tableDescriptor);
return admin.tableExists(tableName);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
@Override
public List searchAll(String tableName, Class c) {
return hbaseTemplate.find(tableName, new Scan(), new RowMapper() {
@Override
public T mapRow(Result result, int rowNum) throws Exception {
T pojo = c.newInstance();
BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(pojo);
List ceList = result.listCells();
for (Cell cellItem : ceList) {
String cellName = new String(CellUtil.cloneQualifier(cellItem));
if (!"class".equals(cellName)) {
beanWrapper.setPropertyValue(cellName, new String(CellUtil.cloneValue(cellItem)));
}
}
return pojo;
}
});
}
@Override
public Object createPro(Object pojo, String tableName, String column, String rowkey) {
if (pojo == null || StringUtils.isBlank(tableName) || StringUtils.isBlank(column)) {
return null;
}
return hbaseTemplate.execute(tableName, new TableCallback() {
@Override
public Object doInTable(HTableInterface table) throws Throwable {
PropertyDescriptor[] pds = BeanUtils.getPropertyDescriptors(pojo.getClass());
BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(pojo);
Put put = new Put(Bytes.toBytes(rowkey));
for (PropertyDescriptor propertyDescriptor : pds) {
String properName = propertyDescriptor.getName();
String value = beanWrapper.getPropertyValue(properName).toString();
if (!StringUtils.isBlank(value)) {
put.add(Bytes.toBytes(column), Bytes.toBytes(properName), Bytes.toBytes(value));
}
}
table.put(put);
return null;
}
});
}
@Override
public T getOne(Class c, String tableName, String rowkey) {
if (c == null || StringUtils.isBlank(tableName) || StringUtils.isBlank(rowkey)) {
return null;
}
return hbaseTemplate.get(tableName, rowkey, new RowMapper() {
public T mapRow(Result result, int rowNum) throws Exception {
List ceList = result.listCells();
JSONObject obj = new JSONObject();
T item = c.newInstance();
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
obj.put(
Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
} else {
return null;
}
item = JSON.parseObject(obj.toJSONString(), c);
return item;
}
});
}
@Override
public String getColumn(String tableName, String rowkey, String family, String column) {
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(family)
|| StringUtils.isBlank(rowkey) || StringUtils.isBlank(column)) {
return null;
}
return hbaseTemplate.get(tableName, rowkey, family, column, new RowMapper() {
public String mapRow(Result result, int rowNum) throws Exception {
List ceList = result.listCells();
String res = "";
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
res =
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
}
return res;
}
});
}
@Override
public List findByRowRange(Class c, String tableName, String startRow, String endRow) {
if (c == null || StringUtils.isBlank(tableName) || StringUtils.isBlank(startRow)
|| StringUtils.isBlank(endRow)) {
return null;
}
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(startRow));
scan.setStopRow(Bytes.toBytes(endRow));
scan.setCacheBlocks(false);
scan.setCaching(2000);
return hbaseTemplate.find(tableName, scan, new RowMapper() {
@Override
public T mapRow(Result result, int rowNum) throws Exception {
T pojo = c.newInstance();
BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(pojo);
List ceList = result.listCells();
for (Cell cellItem : ceList) {
String cellName = new String(CellUtil.cloneQualifier(cellItem));
if (!"class".equals(cellName)) {
beanWrapper.setPropertyValue(cellName, new String(CellUtil.cloneValue(cellItem)));
}
}
return pojo;
}
});
}
}最后是调用了,方法就自己写吧,就调用service中的方法就行了.


最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台