提交 ef7fa71a 编写于 作者: A alphg 提交者: GitHub

Add files via upload

上级 68308387
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.study</groupId>
<artifactId>hbase.bitcomparator</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hbase.bitcomparator</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.7</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
</dependencies>
</project>
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file contains protocol buffers that are used for filters
option java_package = "com.study.hbase.bitcomparator.core.protos";
option java_outer_classname = "RowKeyBitProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
// This file contains protocol buffers that are used for comparators (e.g. in filters)
message RowKeyBitComparator {
required fixed32 md5urlhash = 1;
required fixed32 sitecodehash = 2;
required fixed32 status=3;
required fixed32 code=4;
required fixed32 sdate=5;
required fixed32 type=6;
required fixed32 free=7;
required fixed32 close=8;
required fixed32 queue=9;
required fixed32 scantype=10;
}
{ "_id" : { "$oid" : "584a6e030cf29ba18da2fcd5"} , "url" : "http://www.nmlc.gov.cn/zsyz.htm" , "md5url" : "ea67a96f233d6fcfd7cabc9a6a389283" , "status" : -1 , "code" : 404 , "stime" : 1481272834722 , "sdate" : 20161209 , "sitecode" : "1509250008" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272835222} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 1 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf224463e76c162"} , "url" : "http://www.xzxzzx.gov.cn:8000/wbsprj/indexlogin.do" , "md5url" : "ea67a96f233d6fcfd7cabc9a6a389283" , "status" : -1 , "code" : 503 , "stime" : 1481272828174 , "sdate" : 20161209 , "sitecode" : "3203000002" , "ip" : "10.117.8.89" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834887} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf27d1a31f617e0"} , "url" : "http://www.nmds.gov.cn/portal/bsfw/nsfd/list_1.shtml" , "md5url" : "ea67a96f233d6fcfd7cabc9a6a389283" , "status" : -1 , "code" : 404 , "stime" : 1481272822971 , "sdate" : 20161209 , "sitecode" : "15BM010001" , "ip" : "10.162.86.176" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834846} , "free" : 0 , "close" : 0 , "queue" : 0 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf29ba18da2fcd4"} , "url" : "http://beijing.customs.gov.cn/publish/portal159/tab60561/" , "md5url" : "ea67a96f233d6fcfd7cabc9a6a389283" , "status" : -1 , "code" : 503 , "stime" : 1481272832559 , "sdate" : 20161209 , "sitecode" : "bm28020001" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834766} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf29ba18da2fcd3"} , "url" : "http://www.nss184.com/web2/newlist_index.aspx?classid=1" , "md5url" : "ea67a96f233d6fcfd7cabc9a6a389283" , "status" : -1 , "code" : 404 , "stime" : 1481272826788 , "sdate" : 20161210 , "sitecode" : "BT10000001" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834732} , "free" : 0 , "close" : 1 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf2847bb13af52c"} , "url" : "http://cgw.bjdch.gov.cn/n1569/n4860273/n9719314/index.html" , "md5url" : "ea67a96f233d6fcfd7cabc9a6a389283" , "status" : -1 , "code" : 503 , "stime" : 1481272803601 , "sdate" : 20161208 , "sitecode" : "1101010059" , "ip" : "10.117.187.7" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834150} , "free" : 1 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf29ba18da2fcd2"} , "url" : "http://www.qdn.gov.cn/zwdt/ztfw/shbzfw.htm" , "md5url" : "ea67a96f233d6fcfd7cabc9a6a389283" , "status" : -1 , "code" : 404 , "stime" : 1481272833479 , "sdate" : 20161209 , "sitecode" : "5226000038" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834046} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e010cf29ba18da2fcd1"} , "url" : "http://www.caac.gov.cn/E1/E2/" , "md5url" : "ea67a96f233d6fcfd7cabc9a6a389283" , "status" : -1 , "code" : 404 , "stime" : 1481272833297 , "sdate" : 20161209 , "sitecode" : "bm70000001" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272833723} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e010cf22c906fb6f846"} , "url" : "http://www.ny.xwie.com/Thought/" , "md5url" : "ea67a96f233d6fcfd7cabc9a6a389283" , "status" : -1 , "code" : 404 , "stime" : 1481272821713 , "sdate" : 20161209 , "sitecode" : "4408250003" , "ip" : "10.168.156.196" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272833498} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e010cf29ba18da2fcd0"} , "url" : "http://www.guoluo.gov.cn/html/1746/List.html" , "md5url" : "e353cd577fd721eb71538d0938d041f7" , "status" : -1 , "code" : 404 , "stime" : 1481272832723 , "sdate" : 20161209 , "sitecode" : "6326000004" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272833472} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e030cf29ba18da2fcd5"} , "url" : "http://www.nmlc.gov.cn/zsyz.htm" , "md5url" : "ea67a96f233d6fcfd7cabc9a6a389283" , "status" : -1 , "code" : 404 , "stime" : 1481272834722 , "sdate" : 20161209 , "sitecode" : "1509250008" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272835222} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 1 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf224463e76c162"} , "url" : "http://www.xzxzzx.gov.cn:8000/wbsprj/indexlogin.do" , "md5url" : "fd38c0fb8f6e839be56b67c69ad2baa5" , "status" : -1 , "code" : 503 , "stime" : 1481272828174 , "sdate" : 20161209 , "sitecode" : "3203000002" , "ip" : "10.117.8.89" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834887} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf27d1a31f617e0"} , "url" : "http://www.nmds.gov.cn/portal/bsfw/nsfd/list_1.shtml" , "md5url" : "d51abcd8edff79d23ca4a9a0576a1996" , "status" : -1 , "code" : 404 , "stime" : 1481272822971 , "sdate" : 20161209 , "sitecode" : "15BM010001" , "ip" : "10.162.86.176" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834846} , "free" : 0 , "close" : 0 , "queue" : 0 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf29ba18da2fcd4"} , "url" : "http://beijing.customs.gov.cn/publish/portal159/tab60561/" , "md5url" : "e27bbc9192e760bacc23c226ffd90219" , "status" : -1 , "code" : 503 , "stime" : 1481272832559 , "sdate" : 20161209 , "sitecode" : "bm28020001" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834766} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf29ba18da2fcd3"} , "url" : "http://www.nss184.com/web2/newlist_index.aspx?classid=1" , "md5url" : "cbc2c0571464621024c89aa019cd09ef" , "status" : -1 , "code" : 404 , "stime" : 1481272826788 , "sdate" : 20161210 , "sitecode" : "BT10000001" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834732} , "free" : 0 , "close" : 1 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf2847bb13af52c"} , "url" : "http://cgw.bjdch.gov.cn/n1569/n4860273/n9719314/index.html" , "md5url" : "00a18048ed95f1c057fccc8928ddf610" , "status" : -1 , "code" : 503 , "stime" : 1481272803601 , "sdate" : 20161208 , "sitecode" : "1101010059" , "ip" : "10.117.187.7" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834150} , "free" : 1 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e020cf29ba18da2fcd2"} , "url" : "http://www.qdn.gov.cn/zwdt/ztfw/shbzfw.htm" , "md5url" : "e6bfa0a07e773e3bab27a37f36ff221a" , "status" : -1 , "code" : 404 , "stime" : 1481272833479 , "sdate" : 20161209 , "sitecode" : "5226000038" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272834046} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e010cf29ba18da2fcd1"} , "url" : "http://www.caac.gov.cn/E1/E2/" , "md5url" : "e6217482388cbc57aa80422c3f64bb35" , "status" : -1 , "code" : 404 , "stime" : 1481272833297 , "sdate" : 20161209 , "sitecode" : "bm70000001" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272833723} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e010cf22c906fb6f846"} , "url" : "http://www.ny.xwie.com/Thought/" , "md5url" : "b7912f3bdb50be7b58f5a67d65273201" , "status" : -1 , "code" : 404 , "stime" : 1481272821713 , "sdate" : 20161209 , "sitecode" : "4408250003" , "ip" : "10.168.156.196" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272833498} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
{ "_id" : { "$oid" : "584a6e010cf29ba18da2fcd0"} , "url" : "http://www.guoluo.gov.cn/html/1746/List.html" , "md5url" : "e353cd577fd721eb71538d0938d041f7" , "status" : -1 , "code" : 404 , "stime" : 1481272832723 , "sdate" : 20161209 , "sitecode" : "6326000004" , "ip" : "10.168.106.153" , "port" : 5200 , "type" : 2 , "intime" : { "$date" : 1481272833472} , "free" : 0 , "close" : 0 , "queue" : 1 , "scantype" : 0 , "scanmemo" : ""}
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://192.168.0.202:9000/hbase</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>192.168.0.202</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
</configuration>
此差异已折叠。
因为 它太大了无法显示 source diff 。你可以改为 查看blob
package com.study.hbase.bitcomparator.core;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import com.google.protobuf.InvalidProtocolBufferException;
import com.study.hbase.bitcomparator.core.protos.RowKeyBitProtos;
/**
*
* @author 410s
*
*/
public class RowKeyEqualComparator extends ByteArrayComparable{
Logger logger=Logger.getLogger(RowKeyEqualComparator.class);
protected byte[] data;
protected byte[] templete=null;
public RowKeyEqualComparator(byte[] value) {
super(value);
this.data=value;
if(this.templete==null)
{
logger.info("hahahahahahahahhaha1");
templete=Bytes.copy(this.data);
for(int i=0;i<templete.length;i=i+4){
int k=Bytes.toInt(templete,i,4);
if(k!=0){
for(int j=0;j<4;j++){
templete[i+j]|=0xff;
}
}
}
}else{
logger.info("dadadadadadada2");
}
}
@Override
public byte[] toByteArray() {
RowKeyBitProtos.RowKeyBitComparator.Builder builder=
RowKeyBitProtos.RowKeyBitComparator.newBuilder();
int md5urlHash=Bytes.toInt(this.data, 0, 4);
int siteCodeHash=Bytes.toInt(this.data,4,4);
int status=Bytes.toInt(this.data,8,4);
int code=Bytes.toInt(this.data,12,4);
int sdate=Bytes.toInt(this.data,16,4);
int type=Bytes.toInt(this.data,20,4);
int free=Bytes.toInt(this.data,24,4);
int close=Bytes.toInt(this.data,28,4);
int gueue=Bytes.toInt(this.data,32,4);
int scantype=Bytes.toInt(this.data,36,4);
builder.setMd5Urlhash(md5urlHash);
builder.setSitecodehash(siteCodeHash);
builder.setStatus(status);
builder.setCode(code);
builder.setSdate(sdate);
builder.setType(type);
builder.setFree(free);
builder.setClose(close);
builder.setQueue(gueue);
builder.setScantype(scantype);
return builder.build().toByteArray();
}
public static RowKeyEqualComparator parseFrom(final byte[] bytes) throws DeserializationException{
RowKeyBitProtos.RowKeyBitComparator proto=null;
try{
proto=RowKeyBitProtos.RowKeyBitComparator.parseFrom(bytes);
int md5urlHash=proto.getMd5Urlhash();
int siteCodeHash=proto.getSitecodehash();
int status=proto.getStatus();
int code=proto.getCode();
int sdate=proto.getSdate();
int type=proto.getType();
int free=proto.getFree();
int close=proto.getClose();
int queue=proto.getQueue();
int scantype=proto.getScantype();
byte[][] bs={
Bytes.toBytes(md5urlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(code),
Bytes.toBytes(sdate),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
return new RowKeyEqualComparator(Bytes.add(bs));
}catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
}
public int compareTo(byte[] value){
return compareTo(value,0,value.length);
}
@Override
public int compareTo(byte[] value, int offset, int length) {
//0 的话相等,大于小于,
if(length!=this.data.length){
return 1;
}
//复制一份data对像,用于修改,注意不可以直接 byte[] tmp=this.data;
byte[] tmp=Bytes.copy(this.data);
//第二步使用过滤模板与rowkey进行 与 操作,并将值存入tmp中
for(int i=templete.length-1;i>=0;i--){
//与操作,过滤不作比较的字段
tmp[i]=(byte) ((templete[i]&value[i+offset])&0xff);
}
//第三步判断是否相同
for(int i=tmp.length-1;i>=0;i--){
if(tmp[i]!=this.data[i])
return 1;
}
return 0;
}
}
package com.study.hbase.bitcomparator.core;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import com.google.protobuf.InvalidProtocolBufferException;
import com.study.hbase.bitcomparator.core.protos.RowKeyBitProtos;
/**
* 判断RowKey中某一段大于或小于某值,注意不能比较大于0或小于0
* 如果构造比较器的值有多段大于0的值则只判断左起第一个不为0的数字
* @author 410s
*
*/
public class RowKeyGLComparator extends ByteArrayComparable{
Logger logger=Logger.getLogger(RowKeyGLComparator.class);
protected byte[] data;
public RowKeyGLComparator(byte[] value) {
super(value);
this.data=value;
}
@Override
public int compareTo(byte[] value) {
return this.compareTo(value, 0, value.length);
}
@Override
public int compareTo(byte[] value, int offset, int length) {
//0 的话相等,大于小于,
if(length!=this.data.length){
return 0;
}
for(int i=0;i<this.data.length;i=i+4){
int k=Bytes.toInt(this.data,i,4);
if(k!=0){
logger.info("tatatata:"+(Bytes.toInt(value,i+offset,4)-k));
return k-Bytes.toInt(value,i+offset,4);
}
}
return 0;
}
@Override
public byte[] toByteArray() {
RowKeyBitProtos.RowKeyBitComparator.Builder builder=
RowKeyBitProtos.RowKeyBitComparator.newBuilder();
int md5urlHash=Bytes.toInt(this.data, 0, 4);
int siteCodeHash=Bytes.toInt(this.data,4,4);
int status=Bytes.toInt(this.data,8,4);
int code=Bytes.toInt(this.data,12,4);
int sdate=Bytes.toInt(this.data,16,4);
int type=Bytes.toInt(this.data,20,4);
int free=Bytes.toInt(this.data,24,4);
int close=Bytes.toInt(this.data,28,4);
int gueue=Bytes.toInt(this.data,32,4);
int scantype=Bytes.toInt(this.data,36,4);
builder.setMd5Urlhash(md5urlHash);
builder.setSitecodehash(siteCodeHash);
builder.setStatus(status);
builder.setCode(code);
builder.setSdate(sdate);
builder.setType(type);
builder.setFree(free);
builder.setClose(close);
builder.setQueue(gueue);
builder.setScantype(scantype);
return builder.build().toByteArray();
}
public static RowKeyGLComparator parseFrom(final byte[] bytes) throws DeserializationException{
RowKeyBitProtos.RowKeyBitComparator proto=null;
try{
proto=RowKeyBitProtos.RowKeyBitComparator.parseFrom(bytes);
int md5urlHash=proto.getMd5Urlhash();
int siteCodeHash=proto.getSitecodehash();
int status=proto.getStatus();
int code=proto.getCode();
int sdate=proto.getSdate();
int type=proto.getType();
int free=proto.getFree();
int close=proto.getClose();
int queue=proto.getQueue();
int scantype=proto.getScantype();
byte[][] bs={
Bytes.toBytes(md5urlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(code),
Bytes.toBytes(sdate),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
return new RowKeyGLComparator(Bytes.add(bs));
}catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
}
}
package com.study.hbase.bitcomparator.main;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import com.study.hbase.bitcomparator.core.RowKeyEqualComparator;
import com.study.hbase.bitcomparator.core.RowKeyGLComparator;
import com.study.hbase.bitcomparator.util.HbaseUtil;
import net.sf.json.JSONObject;
public class ConnectionInfo {
private static String tableName = "connectionInfo";
private static String[] families = new String[] { "property" };
private static String fileName = "D:\\Workspace\\eclipse\\hbase.bitcomparator\\resources\\connection.json";
public static void main(String[] args) throws Exception{
//HbaseUtil.dropTable(tableName);
//HbaseUtil.createTable(tableName, families);
//HbaseUtil.insert(tableName, loadConnections());
testEqualSelect();
testGLSelect();
testAll();
}
/**
* 精确查找测试
*
* 查找md5url=00a18048ed95f1c057fccc8928ddf610,siteCode=1101010059,sdate=20161209 的数据
*
* @throws Exception
*/
public static void testEqualSelect() throws Exception{
Table table=HbaseUtil.getHTable(tableName);
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
int md5UrlHash="00a18048ed95f1c057fccc8928ddf610".hashCode();
int siteCodeHash="1101010059".hashCode();
int sdate=20161209;
int status=0;//0;
int code=0;//0;
int type=0;
int free=0;
int close=0;
int queue=0;
int scantype=0;
byte[][] bs={
Bytes.toBytes(md5UrlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(code),
Bytes.toBytes(sdate),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
Filter rowFilter=new RowFilter(CompareFilter.CompareOp.EQUAL,new RowKeyEqualComparator(Bytes.add(bs)));
filterList.add(rowFilter);
FilterList fls=new FilterList(filterList);
scan.setFilter(fls);
ResultScanner resultScanner = null;
try {
resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("property"),Bytes.toBytes("json"))));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (resultScanner != null) {
resultScanner.close();
}
}
}
/**
* 测试范围查找 找查日期大于等于20161209 code小于502 queue小于1的数据
* @throws Exception
*/
public static void testGLSelect() throws Exception{
Table table=HbaseUtil.getHTable(tableName);
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
int md5UrlHash=0;
int siteCodeHash=0;
int sdate=0;
int status=0;
int code=0;
int type=0;
int free=0;
int close=0;
int queue=0;
int scantype=0;
byte[][] bs={
Bytes.toBytes(md5UrlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(code),
Bytes.toBytes(20161209),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
byte[][] bs2={
Bytes.toBytes(md5UrlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(502),
Bytes.toBytes(sdate),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
byte[][] bs3={
Bytes.toBytes(md5UrlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(code),
Bytes.toBytes(sdate),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(1),
Bytes.toBytes(scantype)
};
Filter rowFilter=new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL,new RowKeyGLComparator(Bytes.add(bs)));
Filter rowFilter2=new RowFilter(CompareFilter.CompareOp.LESS,new RowKeyGLComparator(Bytes.add(bs2)));
Filter rowFilter3=new RowFilter(CompareFilter.CompareOp.LESS,new RowKeyGLComparator(Bytes.add(bs3)));
filterList.add(rowFilter);
filterList.add(rowFilter2);
filterList.add(rowFilter3);
FilterList fls=new FilterList(filterList);
scan.setFilter(fls);
ResultScanner resultScanner = null;
try {
resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("property"),Bytes.toBytes("json"))));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (resultScanner != null) {
resultScanner.close();
}
}
}
/**
* 两种查找方式相结合 查找md5url=ea67a96f233d6fcfd7cabc9a6a389283 sdate>=20161209 cod<502 的数据
* @throws Exception
*/
public static void testAll() throws Exception{
Table table=HbaseUtil.getHTable(tableName);
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
int md5UrlHash=0;
int siteCodeHash=0;
int sdate=0;
int status=0;
int code=0;
int type=0;
int free=0;
int close=0;
int queue=0;
int scantype=0;
byte[][] bs={
Bytes.toBytes("ea67a96f233d6fcfd7cabc9a6a389283".hashCode()),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(code),
Bytes.toBytes(sdate),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
byte[][] bs2={
Bytes.toBytes(md5UrlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(code),
Bytes.toBytes(20161209),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
byte[][] bs3={
Bytes.toBytes(md5UrlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(502),
Bytes.toBytes(sdate),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
Filter rowFilter=new RowFilter(CompareFilter.CompareOp.EQUAL,new RowKeyEqualComparator(Bytes.add(bs)));
Filter rowFilter2=new RowFilter(CompareFilter.CompareOp.GREATER,new RowKeyGLComparator(Bytes.add(bs2)));
Filter rowFilter3=new RowFilter(CompareFilter.CompareOp.LESS,new RowKeyGLComparator(Bytes.add(bs3)));
filterList.add(rowFilter);
filterList.add(rowFilter2);
filterList.add(rowFilter3);
FilterList fls=new FilterList(filterList);
scan.setFilter(fls);
ResultScanner resultScanner = null;
try {
resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("property"),Bytes.toBytes("json"))));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (resultScanner != null) {
resultScanner.close();
}
}
}
/*
* 测试ConnectionRowKeyBitComparator 比较方法是否生效
*/
public static void testConnectionRowKeyBitComparator(){
int md5UrlHash=0;//"00a18048ed95f1c057fccc8928ddf610".hashCode();
int siteCodeHash=0;//"1509250008".hashCode();
int sdate=20161209;
int status=0;
int code=0;
int type=0;
int free=0;
int close=0;
int queue=0;
int scantype=0;
byte[][] bs={
Bytes.toBytes(md5UrlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(code),
Bytes.toBytes(sdate),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
byte[][] bs2={
Bytes.toBytes(md5UrlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(code),
Bytes.toBytes(20161207),
Bytes.toBytes(0),
Bytes.toBytes(1),
Bytes.toBytes(2),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
RowKeyEqualComparator crkbc=new RowKeyEqualComparator(Bytes.add(bs));
RowKeyGLComparator crkglc=new RowKeyGLComparator(Bytes.add(bs));
int k=crkbc.compareTo(Bytes.add(bs2), 0, Bytes.add(bs).length);
int t=crkglc.compareTo(Bytes.add(bs2), 0, Bytes.add(bs).length);
System.out.println(k);
System.out.println(t);
}
public static Map<byte[],Map<String,Map<String,String>>> loadConnections() {
Map<byte[],Map<String,Map<String,String>>> res=new HashMap<byte[],Map<String,Map<String,String>>>();
try {
//读取文件中的每一行
List<String> lines = FileUtils.readLines(new File(fileName));
//处理
for (String str : lines) {
//属性与值的Map
Map<String, String> connMap = new HashMap<String, String>();
//将每一行的json字符串进行解析,将其每一个属性与值都存上MAP中
JSONObject jobj = JSONObject.fromObject(str);
for (Object key : jobj.keySet()) {
connMap.put(key.toString(), jobj.get(key).toString());
}
//将整行保存到一个名为json的属性中,方便我们直接取json进行解析
connMap.put("json", str);
Map<String,Map<String,String>> family=new HashMap<String,Map<String,String>>();
family.put("property", connMap);
//构建rowkey,注意区分数字字段与字符串字段,使用json取出来的都是字符串,如果是数字字段需要将其转化为数字,如果是字符串字段需要求其hash值
int md5urlHash=connMap.get("md5url").hashCode();
int siteCodeHash=connMap.get("sitecode").hashCode();
int status=Integer.parseInt(connMap.get("status"));
int code=Integer.parseInt(connMap.get("code"));
int sdate=Integer.parseInt(connMap.get("sdate"));
int type=Integer.parseInt(connMap.get("type"));
int free=Integer.parseInt(connMap.get("free"));
int close=Integer.parseInt(connMap.get("close"));
int queue=Integer.parseInt(connMap.get("queue"));
int scantype=Integer.parseInt(connMap.get("scantype"));
byte[][] bs={
Bytes.toBytes(md5urlHash),
Bytes.toBytes(siteCodeHash),
Bytes.toBytes(status),
Bytes.toBytes(code),
Bytes.toBytes(sdate),
Bytes.toBytes(type),
Bytes.toBytes(free),
Bytes.toBytes(close),
Bytes.toBytes(queue),
Bytes.toBytes(scantype)
};
res.put(Bytes.add(bs), family);
}
return res;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
}
package com.study.hbase.bitcomparator.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
/**
* habse 基本CRUD(创建,读取,更新,删除测试)
* @author 410s
*/
public class HbaseUtil {
static Connection connection;
static Admin hBaseAdmin;
static Configuration conf;
static {
conf = HBaseConfiguration.create();
conf.set("zookeeper.znode.parent","/hbase");
conf.setInt("hbase.rpc.timeout",20000);
conf.setInt("hbase.client.operation.timeout",30000);
conf.setInt("hbase.client.scanner.timeout.period",20000);
try {
connection=ConnectionFactory.createConnection(conf);
hBaseAdmin=connection.getAdmin();
} catch (IOException e1) {
e1.printStackTrace();
}
}
/**
* 创建table
* 首先检查相应的table是否已存在
* 如果不存在则创建、如果已存在则直接返回
* @param tableName 表名
* @param cFamily 列族名(family)
* @throws Exception
*/
public static void createTable(String tableName, String[] cFamilies) throws Exception {
if(hBaseAdmin.tableExists(TableName.valueOf(tableName))){
System.out.println("create table failed.the table already exists");
return;
}
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String cFamily : cFamilies) {
HColumnDescriptor column = new HColumnDescriptor(cFamily);
hTableDescriptor.addFamily(column);
}
hBaseAdmin.createTable(hTableDescriptor);
System.out.println("create table successed");
}
/**
* 删除table
* 首先检查table是否存在
* 如果存在则禁用后删除,不果不存在不做任何事情
* @param tableName
* @throws Exception
*/
public static void dropTable(String tableName) throws Exception {
if (hBaseAdmin.tableExists(TableName.valueOf(tableName))) {
hBaseAdmin.disableTable(TableName.valueOf(tableName));
hBaseAdmin.deleteTable(TableName.valueOf(tableName));
}
System.out.println("drop table successed");
}
/*
* 得到Table对象
*/
public static Table getHTable(String tableName) throws Exception {
return connection.getTable(TableName.valueOf(tableName));
}
/**
* 插入数据
* @param tableName
* @param map Map<rowKey,Map<family,Map<qualifier,value>>> 需要插入的数据组织形式
* rowKey:行键
* family:列族
* qualifier:列标识
* value:值
* @throws Exception
*
*/
public static void insert(String tableName,Map<byte[],Map<String,Map<String, String>>> rows) throws Exception {
Table hTable=getHTable(tableName);
for(Entry<byte[], Map<String, Map<String, String>>> row:rows.entrySet()){
byte[] rowkey=row.getKey();
Map<String,Map<String,String>> families=row.getValue();
for(Entry<String,Map<String,String>> family:families.entrySet()){
byte[] familyName=Bytes.toBytes(family.getKey());
Map<String,String> qualifiers=family.getValue();
for(Entry<String,String> qualifier:qualifiers.entrySet()){
byte[] qualifierName=Bytes.toBytes(qualifier.getKey());
byte[] qualifierValue=Bytes.toBytes(qualifier.getValue());
Put p=new Put(rowkey);
p.addColumn(familyName,qualifierName,qualifierValue);
hTable.put(p);
}
}
}
hTable.close();
System.out.println("insert complete");
}
/**
* 查找单行
* @param tableName
* @param rowKey
* @throws Exception
*/
public static void selectOne(String tableName, String rowKey) throws Exception {
Table hTable = getHTable(tableName);
Get g1 = new Get(Bytes.toBytes(rowKey));
Result result = hTable.get(g1);
foreach(result);
System.out.println("selectOne end");
hTable.close();
}
/**
* 遍历查询结果
* @param result
* @throws Exception
*/
public static void foreach(Result result) throws Exception {
for(Cell cell:result.listCells()){
StringBuilder sb=new StringBuilder();
/*sb.append(Bytes.toString(cell.getRowArray())).append("\t");
sb.append(Bytes.toString(cell.getFamilyArray())).append("\t");
sb.append(Bytes.toString(cell.getQualifierArray())).append("\t");
sb.append(cell.getTimestamp()).append("\t");
sb.append(Bytes.toString(cell.getValueArray())).append("\t"); */
sb.append(cell.getRowLength()).append("\t");
sb.append(Bytes.toString(cell.getRow())).append("\t");
sb.append(Bytes.toString(cell.getFamily())).append("\t");
sb.append(Bytes.toString(cell.getQualifier())).append("\t");
sb.append(cell.getTimestamp()).append("\t");
sb.append(Bytes.toString(cell.getValue())).append("\t");
System.out.println(sb.toString());
}
}
public static void delete(String tableName, String rowKey) throws Exception {
Table hTable = getHTable(tableName);
List<Delete> list = new ArrayList<Delete>();
Delete d1 = new Delete(Bytes.toBytes(rowKey));
list.add(d1);
hTable.delete(list);
Get g1 = new Get(Bytes.toBytes(rowKey));
Result result = hTable.get(g1);
System.out.println("Get: " + result);
System.out.println("delete successed");
hTable.close();
}
public static void selectAll(String tableName) throws Exception {
Table hTable = getHTable(tableName);
Scan scan = new Scan();
ResultScanner resultScanner = null;
try {
resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
foreach(result);
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
if (resultScanner != null) {
resultScanner.close();
}
}
System.out.println("selectAll end");
hTable.close();
}
public static void main(String[] args) throws Exception{
String tableName="testtable";
String[] cfamily={"C1","C2"};
dropTable(tableName);
createTable(tableName,cfamily);
//Map<rowKey,Map<family,Map<column,value>>>
Map<String,String> qualifiers=new HashMap<String,String>();
qualifiers.put("column1", "abc");
qualifiers.put("column2", "def");
Map<String,Map<String,String>> cFamilies=new HashMap<String,Map<String,String>>();
cFamilies.put("C1", qualifiers);
Map<byte[],Map<String,Map<String,String>>> rows=new HashMap<byte[],Map<String,Map<String,String>>>();
rows.put(Bytes.toBytes("row1"), cFamilies);
insert(tableName,rows);
selectOne(tableName,"row1");
selectAll(tableName);
delete(tableName,"row1");
selectAll(tableName);
}
}
package com.study.hbase.bitcomparator.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseUtil_back {
static Connection connection;
static Admin hBaseAdmin;
static Configuration conf;
static {
conf = HBaseConfiguration.create();
conf.set("zookeeper.znode.parent","/hbase");
conf.setInt("hbase.rpc.timeout",20000);
conf.setInt("hbase.client.operation.timeout",30000);
conf.setInt("hbase.client.scanner.timeout.period",20000);
try {
connection=ConnectionFactory.createConnection(conf);
hBaseAdmin=connection.getAdmin();
} catch (IOException e1) {
e1.printStackTrace();
}
}
/**
* 创建table
* 首先检查相应的table是否已存在
* 如果不存在则创建、如果已存在则直接返回
* @param tableName 表名
* @param cFamily 列族名(family)
* @throws Exception
*/
public static void createTable(String tableName, String[] cFamilies) throws Exception {
if(hBaseAdmin.tableExists(TableName.valueOf(tableName))){
System.out.println("create table failed.the table already exists");
return;
}
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String cFamily : cFamilies) {
HColumnDescriptor column = new HColumnDescriptor(cFamily);
hTableDescriptor.addFamily(column);
}
hBaseAdmin.createTable(hTableDescriptor);
System.out.println("create table successed");
}
/**
* 删除table
* 首先检查table是否存在
* 如果存在则禁用后删除,不果不存在不做任何事情
* @param tableName
* @throws Exception
*/
public static void dropTable(String tableName) throws Exception {
if (hBaseAdmin.tableExists(TableName.valueOf(tableName))) {
hBaseAdmin.disableTable(TableName.valueOf(tableName));
hBaseAdmin.deleteTable(TableName.valueOf(tableName));
}
System.out.println("drop table successed");
}
/*
* 得到Table对象
*/
public static Table getHTable(String tableName) throws Exception {
return connection.getTable(TableName.valueOf(tableName));
}
/**
* 插入数据
* @param tableName
* @param map Map<rowKey,Map<family,Map<qualifier,value>>> 需要插入的数据组织形式
* rowKey:行键
* family:列族
* qualifier:列标识
* value:值
* @throws Exception
*
*/
public static void insert(String tableName,Map<String,Map<String,Map<String, String>>> rows) throws Exception {
Table hTable=getHTable(tableName);
for(Entry<String, Map<String, Map<String, String>>> row:rows.entrySet()){
byte[] rowName=Bytes.toBytes(row.getKey());
Map<String,Map<String,String>> families=row.getValue();
for(Entry<String,Map<String,String>> family:families.entrySet()){
byte[] familyName=Bytes.toBytes(family.getKey());
Map<String,String> qualifiers=family.getValue();
for(Entry<String,String> qualifier:qualifiers.entrySet()){
byte[] qualifierName=Bytes.toBytes(qualifier.getKey());
byte[] qualifierValue=Bytes.toBytes(qualifier.getValue());
Put p=new Put(rowName);
p.addColumn(familyName,qualifierName,qualifierValue);
hTable.put(p);
}
}
}
hTable.close();
System.out.println("insert complete");
}
/**
* 查找单行
* @param tableName
* @param rowKey
* @throws Exception
*/
public static void selectOne(String tableName, String rowKey) throws Exception {
Table hTable = getHTable(tableName);
Get g1 = new Get(Bytes.toBytes(rowKey));
Result result = hTable.get(g1);
foreach(result);
System.out.println("selectOne end");
hTable.close();
}
/**
* 遍历查询结果
* @param result
* @throws Exception
*/
public static void foreach(Result result) throws Exception {
for(Cell cell:result.listCells()){
StringBuilder sb=new StringBuilder();
/*sb.append(Bytes.toString(cell.getRowArray())).append("\t");
sb.append(Bytes.toString(cell.getFamilyArray())).append("\t");
sb.append(Bytes.toString(cell.getQualifierArray())).append("\t");
sb.append(cell.getTimestamp()).append("\t");
sb.append(Bytes.toString(cell.getValueArray())).append("\t");*/
sb.append(Bytes.toString(cell.getRow())).append("\t");
sb.append(Bytes.toString(cell.getFamily())).append("\t");
sb.append(Bytes.toString(cell.getQualifier())).append("\t");
sb.append(cell.getTimestamp()).append("\t");
sb.append(Bytes.toString(cell.getValue())).append("\t");
System.out.println(sb.toString());
}
}
public static void delete(String tableName, String rowKey) throws Exception {
Table hTable = getHTable(tableName);
List<Delete> list = new ArrayList<Delete>();
Delete d1 = new Delete(Bytes.toBytes(rowKey));
list.add(d1);
hTable.delete(list);
Get g1 = new Get(Bytes.toBytes(rowKey));
Result result = hTable.get(g1);
System.out.println("Get: " + result);
System.out.println("delete successed");
hTable.close();
}
public static void selectAll(String tableName) throws Exception {
Table hTable = getHTable(tableName);
Scan scan = new Scan();
ResultScanner resultScanner = null;
try {
resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
foreach(result);
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
if (resultScanner != null) {
resultScanner.close();
}
}
System.out.println("selectAll end");
hTable.close();
}
}
package com.study.hbase.bitcomparator;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import com.study.hbase.bitcomparator.core.RowKeyEqualComparator;
import com.study.hbase.bitcomparator.util.HbaseUtil_back;
import net.sf.json.JSONObject;
public class ConnectionInfoTest {
private static String tableName = "connectionInfo";
private static String[] columns = new String[] { "property", "json" };
private static String fileName = "D:\\Workspace\\eclipse\\hbase.bitcomparator\\resources\\connection.json";
private static HTable table=null;
static{
try {
table=(HTable) HbaseUtil_back.getHTable(tableName);
} catch (Exception e) {
System.out.println("create table error");
System.exit(1);
}
}
/**
* 第一个版本的数据读取,RowKey为md5url+stime
* @return
*/
public static List<Map.Entry<String, Map<String, String>>> loadConnections() {
List<Map.Entry<String, Map<String, String>>> res = new ArrayList<Map.Entry<String, Map<String, String>>>();
try {
List<String> lines = FileUtils.readLines(new File(fileName));
for (String str : lines) {
Map<String, String> connMap = new HashMap<String, String>();
JSONObject jobj = JSONObject.fromObject(str);
for (Object key : jobj.keySet()) {
connMap.put(key.toString(), jobj.get(key).toString());
}
connMap.put("json", str);
Map<String, Map<String, String>> con = new HashMap<String, Map<String, String>>();
con.put(connMap.get("md5url")+connMap.get("stime"), connMap);
res.addAll(con.entrySet());
}
return res;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
/**
* 第二个版本的数据读取,RowKey为md5UrlHash+siteCodeHash+sTimeHash 中间会有转为String不是很好。
* @return
*/
public static List<Map.Entry<String, Map<String, String>>> loadConnections2() {
List<Map.Entry<String, Map<String, String>>> res = new ArrayList<Map.Entry<String, Map<String, String>>>();
try {
List<String> lines = FileUtils.readLines(new File(fileName));
for (String str : lines) {
Map<String, String> connMap = new HashMap<String, String>();
JSONObject jobj = JSONObject.fromObject(str);
for (Object key : jobj.keySet()) {
connMap.put(key.toString(), jobj.get(key).toString());
}
connMap.put("json", str);
Map<String, Map<String, String>> con = new HashMap<String, Map<String, String>>();
int md5UrlHash=connMap.get("md5url").hashCode();
int siteCodeHash=connMap.get("sitecode").hashCode();
int sTimeHash=connMap.get("stime").hashCode();
con.put(Bytes.toString(Bytes.add(Bytes.toBytes(md5UrlHash),Bytes.toBytes(siteCodeHash))), connMap);
res.addAll(con.entrySet());
}
return res;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
/**
* 第三个版本 RowKey 直接为哈希值转byte(较第二个版本少了转字符串的过程)
* @return
*/
public static List<Map.Entry<byte[], Map<String, String>>> loadConnections3() {
List<Map.Entry<byte[], Map<String, String>>> res = new ArrayList<Map.Entry<byte[], Map<String, String>>>();
try {
List<String> lines = FileUtils.readLines(new File(fileName));
for (String str : lines) {
Map<String, String> connMap = new HashMap<String, String>();
JSONObject jobj = JSONObject.fromObject(str);
for (Object key : jobj.keySet()) {
connMap.put(key.toString(), jobj.get(key).toString());
}
connMap.put("json", str);
Map<byte[], Map<String, String>> con = new HashMap<byte[], Map<String, String>>();
int md5UrlHash=connMap.get("md5url").hashCode();
int siteCodeHash=connMap.get("sitecode").hashCode();
int sTimeHash=connMap.get("stime").hashCode();
con.put(Bytes.add(Bytes.toBytes(md5UrlHash),Bytes.toBytes(siteCodeHash),Bytes.toBytes(sTimeHash)), connMap);
res.addAll(con.entrySet());
}
return res;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
/**
* 第四个版本 测试Rowkey为0
* @return
*/
public static List<Map.Entry<byte[], Map<String, String>>> loadConnections4() {
List<Map.Entry<byte[], Map<String, String>>> res = new ArrayList<Map.Entry<byte[], Map<String, String>>>();
try {
List<String> lines = FileUtils.readLines(new File(fileName));
for (String str : lines) {
Map<String, String> connMap = new HashMap<String, String>();
JSONObject jobj = JSONObject.fromObject(str);
for (Object key : jobj.keySet()) {
connMap.put(key.toString(), jobj.get(key).toString());
}
connMap.put("json", str);
Map<byte[], Map<String, String>> con = new HashMap<byte[], Map<String, String>>();
int md5UrlHash=0;
int siteCodeHash=0;
int sTimeHash=0;
con.put(Bytes.add(Bytes.toBytes(md5UrlHash),Bytes.toBytes(siteCodeHash),Bytes.toBytes(sTimeHash)), connMap);
res.addAll(con.entrySet());
}
return res;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
/**
* 插入数据
*
* @param tableName
* @param rows
* @throws Exception
*/
public static void insert(String tableName, List<Map.Entry<String, Map<String, String>>> rows) throws Exception {
HTable htable = table;
int count = 0;
for (Map.Entry<String, Map<String, String>> row : rows) {
byte[] rowKey = Bytes.toBytes(row.getKey());
Put p1 = new Put(rowKey);
Map<String, String> values = row.getValue();
for (String key : values.keySet()) {
byte[] value = Bytes.toBytes(values.get(key));
byte[] family = Bytes.toBytes("property");
if (key.equals("json")) {
family = Bytes.toBytes("json");
}
byte[] qualifier = Bytes.toBytes(key);
p1.add(family, qualifier, value);
}
htable.put(p1);
count++;
}
System.out.println("insert " + count + " rows");
}
/**
* 插入数据
*
* @param tableName
* @param rows
* @throws Exception
*/
public static void insert2(String tableName, List<Map.Entry<byte[], Map<String, String>>> rows) throws Exception {
HTable htable = table;
int count = 0;
for (Map.Entry<byte[], Map<String, String>> row : rows) {
byte[] rowKey =row.getKey();
Put p1 = new Put(rowKey);
Map<String, String> values = row.getValue();
for (String key : values.keySet()) {
byte[] value = Bytes.toBytes(values.get(key));
byte[] family = Bytes.toBytes("property");
if (key.equals("json")) {
family = Bytes.toBytes("json");
}
byte[] qualifier = Bytes.toBytes(key);
p1.add(family, qualifier, value);
}
htable.put(p1);
count++;
}
System.out.println("insert " + count + " rows");
}
//使用RowFilter过滤器
public static List<String> findRowKeysByProperty(String property,String value) throws Exception{
List<String>res=new ArrayList<String>();
HTable hTable = table;
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
Filter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("property")));
Filter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(property)));
Filter valueFilter=new ValueFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(value+"")));
filterList.add(familyFilter);
filterList.add(qualifierFilter);
filterList.add(valueFilter);
FilterList fls=new FilterList(filterList);
scan.setFilter(fls);
ResultScanner resultScanner = null;
try {
resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
res.add(Bytes.toString(result.getRow()));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (resultScanner != null) {
resultScanner.close();
}
}
return res;
}
public static List<String> getJsonByRowKeys(List<String> rowKeys) throws IOException{
List<String> res=new ArrayList<String>();
for(String rowKey:rowKeys){
Get get=new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes("json"),Bytes.toBytes("json"));
Result detail = table.get(get);
res.add(Bytes.toString(detail.getValue(Bytes.toBytes("json"),Bytes.toBytes("json"))));
}
return res;
}
public static void printAll() throws Exception{
HbaseUtil_back.selectAll(tableName);
}
/**
* RowKey为md5UrlHash+siteCodeHash+sTimeHas 转字符串
* @throws IOException
*/
public static void find1() throws IOException{
int md5UrlHash="ea67a96f233d6fcfd7cabc9a6a389283".hashCode();
int siteCodeHash="1509250008".hashCode();
int sTimeHash="1481272834722".hashCode();
Get get=new Get(Bytes.toBytes(Bytes.toString(Bytes.add(Bytes.toBytes(md5UrlHash),Bytes.toBytes(siteCodeHash)))));
get.addColumn(Bytes.toBytes("json"),Bytes.toBytes("json"));
Result detail = table.get(get);
System.out.println(Bytes.toString(detail.getValue(Bytes.toBytes("json"),Bytes.toBytes("json"))));
//con.put(Bytes.toString(Bytes.add(Bytes.toBytes(md5UrlHash),Bytes.toBytes(siteCodeHash))), connMap);
//res.addAll(con.entrySet());
}
//直接要据int值转为byte后查找
public static void find2() throws IOException{
int md5UrlHash="ea67a96f233d6fcfd7cabc9a6a389283".hashCode();
int siteCodeHash="1509250008".hashCode();
int sTimeHash="1481272834722".hashCode();
Get get=new Get(Bytes.add(Bytes.toBytes(md5UrlHash),Bytes.toBytes(siteCodeHash),Bytes.toBytes(sTimeHash)));
get.addColumn(Bytes.toBytes("json"),Bytes.toBytes("json"));
Result detail = table.get(get);
System.out.println(Bytes.toString(detail.getValue(Bytes.toBytes("json"),Bytes.toBytes("json"))));
}
public static void testIntToByteLen(){
int md5UrlHash=0;
int siteCodeHash=0;
int sTimeHash=0;
byte[] bs=Bytes.add(Bytes.toBytes(md5UrlHash),Bytes.toBytes(siteCodeHash),Bytes.toBytes(sTimeHash));
System.out.println(bs.length);
}
//查找全0 rowkey
public static void find3() throws IOException{
int md5UrlHash=0;
int siteCodeHash=0;
int sTimeHash=0;
Get get=new Get(Bytes.add(Bytes.toBytes(md5UrlHash),Bytes.toBytes(siteCodeHash),Bytes.toBytes(sTimeHash)));
get.addColumn(Bytes.toBytes("json"),Bytes.toBytes("json"));
Result detail = table.get(get);
System.out.println(Bytes.toString(detail.getValue(Bytes.toBytes("json"),Bytes.toBytes("json"))));
}
public static void test() throws Exception {
long start=System.currentTimeMillis();
List<String> rowKey1=findRowKeysByProperty("sitecode","1101010059");
List<String> rowKey2=findRowKeysByProperty("sdate","20161209");
List<String> rowKey3=findRowKeysByProperty("md5url","00a18048ed95f1c057fccc8928ddf610");
rowKey1.retainAll(rowKey2);
rowKey1.retainAll(rowKey3);
for(String str:getJsonByRowKeys(rowKey1)){
System.out.println(str);
}
long end=System.currentTimeMillis();
System.out.println(end-start);
}
//rowkey 由hashcode转字符串再转byte
public static void test1() throws Exception{
HbaseUtil_back.createTable(tableName, columns);
insert(tableName,loadConnections2());
printAll();
long start=System.currentTimeMillis();
find1();
long end=System.currentTimeMillis();
System.out.println(end-start);
}
//rowkey 由hashcode直接转byte
public static void test2() throws Exception{
HbaseUtil_back.createTable(tableName, columns);
insert2(tableName,loadConnections3());
printAll();
long start=System.currentTimeMillis();
find2();
long end=System.currentTimeMillis();
System.out.println(end-start);
}
//rowkey 值为0
public static void testZeroKey() throws Exception{
HbaseUtil_back.createTable(tableName, columns);
insert2(tableName,loadConnections4());
printAll();
long start=System.currentTimeMillis();
find3();
long end=System.currentTimeMillis();
}
//使用 自定义的比较器
public static void testComparator() throws Exception{
// HbaseUtil.createTable(tableName, columns);
// insert2(tableName,loadConnections3());
// printAll();
Scan scan = new Scan();
List<Filter> filterList = new ArrayList<Filter>();
int md5UrlHash=0;//"ea67a96f233d6fcfd7cabc9a6a389283".hashCode();
int siteCodeHash="1509250008".hashCode();
int sTimeHash=0;//"1481272834722".hashCode();
Filter rowFilter=new RowFilter(CompareFilter.CompareOp.EQUAL,new RowKeyEqualComparator(Bytes.add(Bytes.toBytes(md5UrlHash),Bytes.toBytes(siteCodeHash),Bytes.toBytes(sTimeHash))));
filterList.add(rowFilter);
FilterList fls=new FilterList(filterList);
scan.setFilter(fls);
ResultScanner resultScanner = null;
try {
resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
// HbaseUtil.foreach(result);
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("json"),Bytes.toBytes("json"))));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (resultScanner != null) {
resultScanner.close();
}
}
}
/*
* 测试ConnectionRowKeyBitComparator 比较方法是否生效
*/
public static void testConnectionRowKeyBitComparator(){
int md5UrlHash="ea67a96f233d6fcfd7cabc9a6a389283".hashCode();
int siteCodeHash="1509250008".hashCode();
int sTimeHash="1481272834722".hashCode();
RowKeyEqualComparator crkbc=new RowKeyEqualComparator(Bytes.add(Bytes.toBytes(0),Bytes.toBytes(siteCodeHash),Bytes.toBytes(0)));
int k=crkbc.compareTo(Bytes.add(Bytes.toBytes(md5UrlHash),Bytes.toBytes(0),Bytes.toBytes(sTimeHash)), 0, 12);
System.out.println(k);
}
public static void main(String[] args) throws Exception {
//testConnectionRowKeyBitComparator();
testComparator() ;
/*byte a=(byte) 0xfd;
byte b=(byte) 0xff;
byte c=(byte) (a^b);
System.out.println(c);*/
}
}
package com.study.hbase.test.basic;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
/**
* habse 基本CRUD(创建,读取,更新,删除测试)
* @author 410s
*/
public class CRUD {
static Connection connection;
static Admin hBaseAdmin;
static Configuration conf;
static {
conf = HBaseConfiguration.create();
conf.set("zookeeper.znode.parent","/hbase");
conf.setInt("hbase.rpc.timeout",20000);
conf.setInt("hbase.client.operation.timeout",30000);
conf.setInt("hbase.client.scanner.timeout.period",20000);
try {
connection=ConnectionFactory.createConnection(conf);
hBaseAdmin=connection.getAdmin();
} catch (IOException e1) {
e1.printStackTrace();
}
}
/**
* 创建table
* 首先检查相应的table是否已存在
* 如果不存在则创建、如果已存在则直接返回
* @param tableName 表名
* @param cFamily 列族名(family)
* @throws Exception
*/
public static void createTable(String tableName, String[] cFamilies) throws Exception {
if(hBaseAdmin.tableExists(TableName.valueOf(tableName))){
System.out.println("create table failed.the table already exists");
return;
}
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String cFamily : cFamilies) {
HColumnDescriptor column = new HColumnDescriptor(cFamily);
hTableDescriptor.addFamily(column);
}
hBaseAdmin.createTable(hTableDescriptor);
System.out.println("create table successed");
}
/**
* 删除table
* 首先检查table是否存在
* 如果存在则禁用后删除,不果不存在不做任何事情
* @param tableName
* @throws Exception
*/
public static void dropTable(String tableName) throws Exception {
if (hBaseAdmin.tableExists(TableName.valueOf(tableName))) {
hBaseAdmin.disableTable(TableName.valueOf(tableName));
hBaseAdmin.deleteTable(TableName.valueOf(tableName));
}
System.out.println("drop table successed");
}
/*
* 得到Table对象
*/
public static Table getHTable(String tableName) throws Exception {
return connection.getTable(TableName.valueOf(tableName));
}
/**
* 插入数据
* @param tableName
* @param map Map<rowKey,Map<family,Map<qualifier,value>>> 需要插入的数据组织形式
* rowKey:行键
* family:列族
* qualifier:列标识
* value:值
* @throws Exception
*
*/
public static void insert(String tableName,Map<String,Map<String,Map<String, String>>> rows) throws Exception {
Table hTable=getHTable(tableName);
for(Entry<String, Map<String, Map<String, String>>> row:rows.entrySet()){
byte[] rowName=Bytes.toBytes(row.getKey());
Map<String,Map<String,String>> families=row.getValue();
for(Entry<String,Map<String,String>> family:families.entrySet()){
byte[] familyName=Bytes.toBytes(family.getKey());
Map<String,String> qualifiers=family.getValue();
for(Entry<String,String> qualifier:qualifiers.entrySet()){
byte[] qualifierName=Bytes.toBytes(qualifier.getKey());
byte[] qualifierValue=Bytes.toBytes(qualifier.getValue());
Put p=new Put(rowName);
p.addColumn(familyName,qualifierName,qualifierValue);
hTable.put(p);
}
}
}
hTable.close();
System.out.println("insert complete");
}
/**
* 查找单行
* @param tableName
* @param rowKey
* @throws Exception
*/
public static void selectOne(String tableName, String rowKey) throws Exception {
Table hTable = getHTable(tableName);
Get g1 = new Get(Bytes.toBytes(rowKey));
Result result = hTable.get(g1);
foreach(result);
System.out.println("selectOne end");
hTable.close();
}
/**
* 遍历查询结果
* @param result
* @throws Exception
*/
public static void foreach(Result result) throws Exception {
for(Cell cell:result.listCells()){
StringBuilder sb=new StringBuilder();
/*sb.append(Bytes.toString(cell.getRowArray())).append("\t");
sb.append(Bytes.toString(cell.getFamilyArray())).append("\t");
sb.append(Bytes.toString(cell.getQualifierArray())).append("\t");
sb.append(cell.getTimestamp()).append("\t");
sb.append(Bytes.toString(cell.getValueArray())).append("\t");*/
sb.append(Bytes.toString(cell.getRow())).append("\t");
sb.append(Bytes.toString(cell.getFamily())).append("\t");
sb.append(Bytes.toString(cell.getQualifier())).append("\t");
sb.append(cell.getTimestamp()).append("\t");
sb.append(Bytes.toString(cell.getValue())).append("\t");
System.out.println(sb.toString());
}
}
public static void delete(String tableName, String rowKey) throws Exception {
Table hTable = getHTable(tableName);
List<Delete> list = new ArrayList<Delete>();
Delete d1 = new Delete(Bytes.toBytes(rowKey));
list.add(d1);
hTable.delete(list);
Get g1 = new Get(Bytes.toBytes(rowKey));
Result result = hTable.get(g1);
System.out.println("Get: " + result);
System.out.println("delete successed");
hTable.close();
}
public static void selectAll(String tableName) throws Exception {
Table hTable = getHTable(tableName);
Scan scan = new Scan();
ResultScanner resultScanner = null;
try {
resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
foreach(result);
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
if (resultScanner != null) {
resultScanner.close();
}
}
System.out.println("selectAll end");
hTable.close();
}
public static void main(String[] args) throws Exception{
String tableName="testtable";
String[] cfamily={"C1","C2"};
dropTable(tableName);
createTable(tableName,cfamily);
//Map<rowKey,Map<family,Map<column,value>>>
Map<String,String> qualifiers=new HashMap<String,String>();
qualifiers.put("column1", "abc");
qualifiers.put("column2", "def");
Map<String,Map<String,String>> cFamilies=new HashMap<String,Map<String,String>>();
cFamilies.put("C1", qualifiers);
Map<String,Map<String,Map<String,String>>> rows=new HashMap<String,Map<String,Map<String,String>>>();
rows.put("row1", cFamilies);
insert(tableName,rows);
selectOne(tableName,"row1");
selectAll(tableName);
delete(tableName,"row1");
selectAll(tableName);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册