這篇文章主要講解了“hbase怎么在不同版本hdfs集群之間轉移數(shù)據(jù)”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“hbase怎么在不同版本hdfs集群之間轉移數(shù)據(jù)”吧!
十余年的監(jiān)利網(wǎng)站建設經驗,針對設計、前端、開發(fā)、售后、文案、推廣等六對一服務,響應快,48小時及時工作處理。成都營銷網(wǎng)站建設的優(yōu)勢是能夠根據(jù)用戶設備顯示端的尺寸不同,自動調整監(jiān)利建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設計,從而大程度地提升瀏覽體驗。創(chuàng)新互聯(lián)建站從事“監(jiān)利網(wǎng)站設計”,“監(jiān)利網(wǎng)站推廣”以來,每個客戶項目都認真落實執(zhí)行。
最簡單的辦法就是把src集群的數(shù)據(jù)導到本地,然后起另一個進程將本地數(shù)據(jù)傳到des集群上去。
不過這有幾個問題:
效率降低
占用本地磁盤空間
不能應付實時導數(shù)據(jù)需求
兩個進程需要協(xié)調,復雜度增加
更好的辦法是在同一個進程內一邊讀src數(shù)據(jù),一邊寫des集群。不過這相當于在同一個進程空間內加載兩個版本的hadoop jar包,這就需要在程序中使用兩個classloader來實現(xiàn)。
以下代碼可以實現(xiàn)classloader加載自定義的jar包,并生成需要的Configuration對象:
Java代碼
URL[] jarUrls = new URL[1];
jarUrls[0]=new File(des_jar_path).toURI().toURL();
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
Class Proxy = Class.forName("yourclass", true, jarloader);
Configuration conf = (Configuration)Proxy.newInstance();
URL[] jarUrls = new URL[1];
jarUrls[0]=new File(des_jar_path).toURI().toURL();
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
Class Proxy = Class.forName("yourclass", true, jarloader);
Configuration conf = (Configuration)Proxy.newInstance();
但是由于在生成HTable對象時,需要使用這個conf對象,而加載這個conf對象的代碼本身是由默認的classloader加載的,也就是0.19.2的jar包。所以在以上代碼最后一行所強制轉換的Configuration對象仍然是0.19.2版本的。那怎么辦呢?
琢磨了一會,發(fā)現(xiàn)如果要實現(xiàn)以上功能,必須將生成HTable對象,以及以后的所有hbase操作都使用這個新的classloader,因此這個新的classloader必須加載除了0.19.2的jar包外所有需要用到的jar包,然后把所有操作都封裝進去。在外面用反射來調用。
這樣的話,通常構造函數(shù)都不為空了,因此需要用到Constructor來構造一個自定義的構造函數(shù)
代碼段如下:
Java代碼
main.java
void init(){
ClassLoader jarloader = generateJarLoader();
Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});
Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
proxy = con.newInstance(new Object[]{path, tablename, autoflush});
}
void put(){
...
while((line = getLine()) != null) {
proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));
Method addPut = proxy.getClass().getMethod("addPut",
new Class[]{String.class, String.class, String.class});
addPut.invoke(proxy, new Object[]{field, column, encode});
proxy.getClass().getMethod("putLine").invoke(proxy);
}
}
ClassLoader generateJarLoader() throws IOException {
String libPath = System.getProperty("java.ext.dirs");
FileFilter filter = new FileFilter() {
@Override
public boolean accept(File pathname) {
if(pathname.getName().startsWith("hadoop-0.19.2"))
return false;
else
return pathname.getName().endsWith(".jar");
}
};
File[] jars = new File(libPath).listFiles(filter);
URL[] jarUrls = new URL[jars.length+1];
int k = 0;
for (int i = 0; i < jars.length; i++) {
jarUrls[k++] = jars[i].toURI().toURL();
}
jarUrls[k] = new File("hadoop-0.20.205.jar")
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
return jarloader;
}
main.java
void init(){
ClassLoader jarloader = generateJarLoader();
Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});
Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
proxy = con.newInstance(new Object[]{path, tablename, autoflush});
}
void put(){
...
while((line = getLine()) != null) {
proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));
Method addPut = proxy.getClass().getMethod("addPut",
new Class[]{String.class, String.class, String.class});
addPut.invoke(proxy, new Object[]{field, column, encode});
proxy.getClass().getMethod("putLine").invoke(proxy);
}
}
ClassLoader generateJarLoader() throws IOException {
String libPath = System.getProperty("java.ext.dirs");
FileFilter filter = new FileFilter() {
@Override
public boolean accept(File pathname) {
if(pathname.getName().startsWith("hadoop-0.19.2"))
return false;
else
return pathname.getName().endsWith(".jar");
}
};
File[] jars = new File(libPath).listFiles(filter);
URL[] jarUrls = new URL[jars.length+1];
int k = 0;
for (int i = 0; i < jars.length; i++) {
jarUrls[k++] = jars[i].toURI().toURL();
}
jarUrls[k] = new File("hadoop-0.20.205.jar")
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
return jarloader;
}
Java代碼
HBaseProxy.java
public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)
throws IOException{
Configuration conf = new Configuration();
conf.addResource(new Path(hbase_conf));
config = new Configuration(conf);
htable = new HTable(config, tableName);
admin = new HBaseAdmin(config);
htable.setAutoFlush(autoflush);
}
public void addPut(String field, String column, String encode) throws IOException {
try {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes(encode));
} catch (UnsupportedEncodingException e) {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes());
}
}
public void generatePut(String rowkey){
p = new Put(rowkey.getBytes());
}
public void putLine() throws IOException{
htable.put(p);
}
HBaseProxy.java
public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)
throws IOException{
Configuration conf = new Configuration();
conf.addResource(new Path(hbase_conf));
config = new Configuration(conf);
htable = new HTable(config, tableName);
admin = new HBaseAdmin(config);
htable.setAutoFlush(autoflush);
}
public void addPut(String field, String column, String encode) throws IOException {
try {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes(encode));
} catch (UnsupportedEncodingException e) {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes());
}
}
public void generatePut(String rowkey){
p = new Put(rowkey.getBytes());
}
public void putLine() throws IOException{
htable.put(p);
}
總之,在同一個進程中加載多個classloader時一定要注意,classloader A所加載的對象是不能轉換成classloader B的對象的,當然也不能使用。兩個空間的相互調用只能用java的基本類型或是反射。
感謝各位的閱讀,以上就是“hbase怎么在不同版本hdfs集群之間轉移數(shù)據(jù)”的內容了,經過本文的學習后,相信大家對hbase怎么在不同版本hdfs集群之間轉移數(shù)據(jù)這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關知識點的文章,歡迎關注!