admin 管理员组文章数量: 894082
hive to hbase
-- hive -e 'show create table grades' > table
CREATE TABLE `mydb.grades`(
`id` int COMMENT 'ID',
`name` string COMMENT '姓名',
`age` int COMMENT '年龄')
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'
LOCATION
'hdfs://192.168.253.11:9000/user/root/hive/warehouse/mydb.db/grades'
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='true',
'numFiles'='1',
'numRows'='0',
'rawDataSize'='0',
'totalSize'='30',
'transient_lastDdlTime'='1457602162')
package edu.wzm.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
/**
* Created by GatsbyNewton on 2016/3/24.
*/
public class HiveTableUtils {
//Gain hive table columns by parsing file.
public static List<String> getFieldName(String filePath){
File file = new File(filePath);
BufferedReader reader = null;
List<String> fieldName = new ArrayList<String>();
try {
if (file.exists()) {
reader = new BufferedReader(new FileReader(file));
String tmp = null;
while ((tmp = reader.readLine()) != null) {
if (tmp.contains("`") && tmp.contains("COMMENT")) {
int start = tmp.indexOf("`");
int end = tmp.lastIndexOf("`");
fieldName.add(tmp.substring(start + 1, end));
}
}
} else {
System.err.println("The file doesn't exist!");
System.exit(1);
}
reader.close();
}
catch (Exception e) {
e.printStackTrace();
}
return fieldName;
}
import edu.wzm.transform.RCFileToHFile;
import edu.wzm.utils.HiveTableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceInputFormat;
import java.util.List;
/**
* Created by GatsbyNewton on 2016/3/24.
*/
public class Driver extends Configured implements Tool{
private static Configuration conf = new Configuration();
private static Configuration hconf = null;
private static HBaseAdmin hadmin = null;
public static void connectHBase(){
final String HBASE_CONFIG_ZOOKEEPER_CLIENT = "hbase.zookeeper.property.clientPort";
final String HBASE_ZOOKEEPER_CLIENT_PORT = "2181";
final String HBASE_CONFIG_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
final String HBASE_ZOOKEEPER_SERVER = "hbase38,hbase43,hbase00";
conf.set(HBASE_CONFIG_ZOOKEEPER_CLIENT, HBASE_ZOOKEEPER_CLIENT_PORT);
conf.set(HBASE_CONFIG_ZOOKEEPER_QUORUM, HBASE_ZOOKEEPER_SERVER);
hconf = HBaseConfiguration.create(conf);
try{
hadmin = new HBaseAdmin(hconf);
}
catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args)throws Exception{
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 4){
System.err.println("Usage: <rcfile> <hfile> <schemafile> <hbasetable>");
System.exit(1);
}
String path = System.getProperty("user.dir") + otherArgs[2];
List<String> fieldNames = HiveTableUtils.getFieldName(path);
StringBuilder sb = new StringBuilder(fieldNames.get(0));
int size = fieldNames.size();
for(int i = 1; i < size; i++){
sb.append(":").append(fieldNames.get(i));
}
conf.set("schema", sb.toString());
if(ToolRunner.run(conf, new Driver(), otherArgs) == 0){
// Importing the generated HFiles into a HBase table
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(new Path(otherArgs[1], otherArgs[3]);
System.exit(0);
}
else{
System.exit(1);
}
}
@SuppressWarnings("deprecation")
@Override
public int run(String[] strings) throws Exception {
Configuration config = getConf();
Driver.connectHBase();
Job job = new Job(config, "RCFile to HFile");
job.setJarByClass(Driver.class);
job.setMapperClass(RCFileToHFile.ParseMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
//Reduce's number is 0.
job.setNumReduceTasks(0);
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
job.setInputFormatClass(RCFileMapReduceInputFormat.class);
// job.setOutputFormatClass(HFileOutputFormat.class);
HTable table = new HTable(config, strings[3]);
HFileOutputFormat.configureIncrementalLoad(job, table);
RCFileMapReduceInputFormat.addInputPath(job, new Path(strings[0]));
FileOutputFormat.setOutputPath(job, new Path(strings[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import edu.wzm.utils.HiveTableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceInputFormat;
public class RCFileToHFile {
public static class ParseMapper extends Mapper<LongWritable, BytesRefArrayWritable, ImmutableBytesWritable, KeyValue>{
// private List<String> fieldName = null;
private String[] fieldName = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
Configuration conf = context.getConfiguration();
String schema = conf.get("schema");
fieldName = schema.split(":");
// fieldName = new ArrayList<String>();
// fieldName.add("id");
// fieldName.add("name");
// fieldName.add("age");
}
@Override
protected void map(LongWritable key, BytesRefArrayWritable values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Text line = new Text();
List<String> fields = new ArrayList<String>();
int size = values.size();
for(int i = 0; i < size; i++){
BytesRefWritable value = values.get(i);
line.set(value.getData(), value.getStart(), value.getLength());
fields.add(line.toString());
}
String rowKey = fields.get(0);
String columnFamily = "cf";
int length = fieldName.length;
ImmutableBytesWritable hKey = new ImmutableBytesWritable();
hKey.set(rowKey.getBytes());
KeyValue kv = null;
for(int i = 1; i < length; i++){
kv = new KeyValue(hKey.get(), columnFamily.getBytes(), fieldName[i].getBytes(), fields.get(i).getBytes());
context.write(hKey, kv);
}
}
}
}
本文标签: hivetohbase
版权声明:本文标题:hivetohbase 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1687606185h120345.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论