生活随笔
收集整理的這篇文章主要介紹了
java实现对HDFS增删改查(CRUD)等操作
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
實現對HDFS增刪改查CRUD等操作
1 查找
列出某個目錄下的文件名稱,hdfs命令如下所示:
hdfs dfs –ls/usr/app
java代碼片段:
[plain] view plain
copy print?
public?void?list(String?srcPath)?{?????????????Configuration?conf?=?new?Configuration();?????????????LOG.info("[Defaultfs]?:"?+conf.get("fs.default.name"));???????????conf.set("hadoop.job.ugi","app,app");???//It?is?not?necessary?for?the?default?user.?????????????FileSystem?fs;????????????try?{?????????????????????fs=?FileSystem.get(conf);?????????????????????RemoteIterator<LocatedFileStatus>rmIterator?=?fs.listLocatedStatus(new?Path(srcPath));?????????????????????while?(rmIterator.hasNext())?{???????????????????????????????Path?path?=?rmIterator.next().getPath();???????????????????????????????if(fs.isDirectory(path)){?????????????????????????????????????????LOG.info("-----------DirectoryName:?"+path.getName());???????????????????????????????}???????????????????????????????else?if(fs.isFile(path)){?????????????????????????????????????????LOG.info("-----------FileName:?"+path.getName());???????????????????????????????}?????????????????????}????????????}catch?(IOException?e)?{?????????????????????LOG.error("list?fileSysetm?object?stream.:"?,?e);?????????????????????new?RuntimeException(e);????????????}??}??
輸出結果:
2014-03-11 22:38:15,329 INFO? (com.hdfs.client.SyncDFS:48) ------------File Name: README.txt
2014-03-11 22:38:15,331 INFO? (com.hdfs.client.SyncDFS:45) ------------Directory Name: blog_blogpost
2014-03-11 22:38:15,333 INFO? (com.hdfs.client.SyncDFS:45) ------------Directory Name: test
讀取文件中的內容,hdfs命令如下:
hdfs dfs –cat /input
java 代碼:
[plain] view plain
copy print?
public?void?readFile(String?file){?????????????Configurationconf?=?new?Configuration();?????????????FileSystemfs;?????????????try?{??????????????????????fs=?FileSystem.get(conf);??????????????????????Pathpath?=?new?Path(file);??????????????????????if(!fs.exists(path)){???????????????????????????????LOG.warn("file'"+?file+"'?doesn't?exist!");???????????????????????????????return?;??????????????????????}??????????????????????FSDataInputStreamin?=?fs.open(path);??????????????????????Stringfilename?=?file.substring(file.lastIndexOf('/')?+?1,?file.length());??????????????????????OutputStreamout?=?new?BufferedOutputStream(new?FileOutputStream(?????????????????????????????????????????????????????????????????????new?File(filename)));????????????????????????byte[]?b?=?new?byte[1024];??????????????????????int?numBytes?=?0;??????????????????????while?((numBytes?=?in.read(b))?>?0)?{???????????????????????????????out.write(b,0,?numBytes);??????????????????????}??????????????????????in.close();??????????????????????out.close();??????????????????????fs.close();?????????????}catch?(IOException?e)?{??????????????????????LOG.error("ifExists?fs?Exception?caught!?:"?,?e);??????????????????????new?RuntimeException(e);?????????????}???}??
獲取文件的修改時間,java代碼:
[plain] view plain
copy print?
/**??????????*?Gets?the?information?about?the?file?modifiedtime.??????????*?@param?source??????????*?@throws?IOException??????????*/?????????public?void?getModificationTime(String?source)?throws?IOException{???????????????????????????????????????Configurationconf?=?new?Configuration();???????????????????????????????????????FileSystemfs?=?FileSystem.get(conf);???????????????????PathsrcPath?=?new?Path(source);???????????????????????????????????????//?Check?if?the?file?alreadyexists???????????????????if?(!(fs.exists(srcPath)))?{???????????????????System.out.println("No?such?destination?"?+?srcPath);???????????????????return;???????????????????}???????????????????//?Get?the?filename?out?of?thefile?path???????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());???????????????????????????????????????FileStatusfileStatus?=?fs.getFileStatus(srcPath);???????????????????long?modificationTime?=fileStatus.getModificationTime();???????????????????????????????????????LOG.info("modified?datetime:?"?+?System.out.format("File?%s;?Modification?time?:?%0.2f%n",filename,modificationTime));?????????????????????????????}??
獲取文件塊定位信息,java代碼:
[plain] view plain
copy print?
/**????????????*?Gets?the?file?block?location?info????????????*?@param?source????????????*?@throws?IOException????????????*/???????????public?void?getBlockLocations(String?source)?throws?IOException{?????????????????????Configurationconf?=?new?Configuration();?????????????????????FileSystemfs?=?FileSystem.get(conf);?????????????????????PathsrcPath?=?new?Path(source);???????????????????????????????????????????//?Check?if?the?file?alreadyexists?????????????????????if?(!(ifExists(source)))?{??????????????????????????????System.out.println("No?such?destination?"?+?srcPath);??????????????????????????????return;?????????????????????}?????????????????????//?Get?the?filename?out?of?thefile?path?????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());???????????????????????????????????????????FileStatusfileStatus?=?fs.getFileStatus(srcPath);???????????????????????????????????????????BlockLocation[]blkLocations?=?fs.getFileBlockLocations(fileStatus,?0,?fileStatus.getLen());?????????????????????int?blkCount?=?blkLocations.length;???????????????????????????????????????????System.out.println("File?:"?+?filename?+?"stored?at:");?????????????????????for?(int?i=0;?i?<?blkCount;?i++)?{??????????????????????????????String[]hosts?=?blkLocations[i].getHosts();??????????????????????????????LOG.info("host?ip:"?+System.out.format("Host?%d:?%s?%n",?i,?hosts));?????????????????????}???????????}??
獲取Hadoop集群中data node的DNS主機名,java代碼:
[plain] view plain
copy print?
public?void?getHostnames?()?throwsIOException{?????????????????????Configurationconfig?=?new?Configuration();?????????????????????????????????????????FileSystemfs?=?FileSystem.get(config);?????????????????????DistributedFileSystemhdfs?=?(DistributedFileSystem)?fs;?????????????????????DatanodeInfo[]dataNodeStats?=?hdfs.getDataNodeStats();???????????????????????????????????????????String[]names?=?new?String[dataNodeStats.length];?????????????????????for?(int?i?=?0;?i?<?dataNodeStats.length;?i++)?{??????????????????????????????names[i]=?dataNodeStats[i].getHostName();??????????????????????????????LOG.info("datenode?hostname:"+(dataNodeStats[i].getHostName()));?????????????????????}???????????}??
?
2 創建
創建一個目錄,指定具體的文件路徑。hdfs命令如下:
[plain] view plain
copy print?
hdfs?dfs?–mkdir/usr/app/tmp??
java代碼:
? ? ? ?
[java] view plain
copy print?
public?void?mkdir(String?dir){???????????????????Configurationconf?=?new?Configuration();???????????????????FileSystemfs?=?null;???????????????????try?{????????????????????????????fs=?FileSystem.get(conf);????????????????????????????Pathpath?=?new?Path(dir);????????????????????????????if(!fs.exists(path)){?????????????????????????????????????fs.mkdirs(path);?????????????????????????????????????LOG.debug("create?directory?'"+dir+"'?successfully!");????????????????????????????}else{?????????????????????????????????????LOG.debug("directory?'"+dir+"'?exits!");????????????????????????????}???????????????????}catch?(IOException?e)?{????????????????????????????LOG.error("FileSystem?get?configuration?with?anerror");????????????????????????????e.printStackTrace();???????????????????}finally{????????????????????????????if(fs!=?null){?????????????????????????????????????try?{???????????????????????????????????????????????fs.close();?????????????????????????????????????}catch?(IOException?e)?{???????????????????????????????????????????????LOG.error("close?fs?object?stream.?:"?,?e);???????????????????????????????????????????????new?RuntimeException(e);?????????????????????????????????????}????????????????????????????}???????????????????}?????????}??
將本地文件上傳到hdfs上去,java代碼如下:
[plain] view plain
copy print?
public?void?copyFromLocal?(String?source,?String?dest)?{???????????????????????????????????????????Configurationconf?=?new?Configuration();?????????????????????FileSystemfs;?????????????????????try?{??????????????????????????????fs=?FileSystem.get(conf);??????????????????????????????PathsrcPath?=?new?Path(source);?????????????????????????????????????????????????????????????PathdstPath?=?new?Path(dest);??????????????????????????????//?Check?if?the?file?alreadyexists??????????????????????????????if?(!(fs.exists(dstPath)))?{???????????????????????????????????????LOG.warn("dstPathpath?doesn't?exist"?);???????????????????????????????????????LOG.error("No?such?destination?"?+?dstPath);???????????????????????????????????????return;??????????????????????????????}?????????????????????????????????????????????????????????????//?Get?the?filename?out?of?thefile?path??????????????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());?????????????????????????????????????????????????????????????try{???????????????????????????????????????//if?the?file?exists?in?thedestination?path,?it?will?throw?exception.??//???????????????????????????????????fs.copyFromLocalFile(srcPath,dstPath);???????????????????????????????????????//remove?and?overwrite?files?withthe?method???????????????????????????????????????//copyFromLocalFile(booleandelSrc,?boolean?overwrite,?Path?src,?Path?dst)???????????????????????????????????????fs.copyFromLocalFile(false,?true,?srcPath,?dstPath);???????????????????????????????????????LOG.info("File?"?+?filename?+?"copied?to?"?+?dest);??????????????????????????????}catch(Exception?e){???????????????????????????????????????LOG.error("copyFromLocalFile?exception?caught!:"?,?e);???????????????????????????????????????new?RuntimeException(e);??????????????????????????????}finally{???????????????????????????????????????fs.close();??????????????????????????????}?????????????????????}catch?(IOException?e1)?{??????????????????????????????LOG.error("copyFromLocal?IOException?objectstream.?:"?,e1);??????????????????????????????new?RuntimeException(e1);?????????????????????}???????????}??
????????
添加一個文件到指定的目錄下,java代碼如下:
? ? ??
[java] view plain
copy print?
public?void?addFile(String?source,?String?dest)??{??????????????????????????????????????????????????????Configurationconf?=?new?Configuration();???????????????????????????FileSystemfs;???????????????????????????try?{????????????????????????????????????fs=?FileSystem.get(conf);????????????????????????????????????????????????????????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());???????????????????????????????????????????????????????????????????????????????????????????????????????????if?(dest.charAt(dest.length()?-?1)?!=?'/')?{??????????????????????????????????????????????dest=?dest?+?"/"?+?filename;????????????????????????????????????}else?{??????????????????????????????????????????????dest=?dest?+?filename;????????????????????????????????????}???????????????????????????????????????????????????????????????????????????????????????????????????????????Pathpath?=?new?Path(dest);????????????????????????????????????if?(fs.exists(path))?{??????????????????????????????????????????????LOG.error("File?"?+?dest?+?"?already?exists");??????????????????????????????????????????????return;????????????????????????????????????}???????????????????????????????????????????????????????????????????????????????????????????????????????????FSDataOutputStreamout?=?fs.create(path);????????????????????????????????????InputStreamin?=?new?BufferedInputStream(new?FileInputStream(????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????new?File(source)));???????????????????????????????????????????????????????????????????????byte[]?b?=?new?byte[1024];????????????????????????????????????int?numBytes?=?0;????????????????????????????????????????????????????????????????????????while?((numBytes?=?in.read(b))?>?0)?{??????????????????????????????????????????????out.write(b,0,?numBytes);????????????????????????????????????}????????????????????????????????????in.close();????????????????????????????????????out.close();????????????????????????????????????fs.close();???????????????????????????}catch?(IOException?e)?{????????????????????????????????????LOG.error("addFile?Exception?caught!?:"?,?e);????????????????????????????????????new?RuntimeException(e);???????????????????????????}????????}??
3 修改
重新命名hdfs中的文件名稱,java代碼如下:
? ??
[java] view plain
copy print?
public?void?renameFile?(String?fromthis,?String?tothis){????????????????Configurationconf?=?new?Configuration();?????????????????????????????????FileSystemfs;????????????????try?{?????????????????????????fs=?FileSystem.get(conf);?????????????????????????PathfromPath?=?new?Path(fromthis);?????????????????????????PathtoPath?=?new?Path(tothis);???????????????????????????????????????????????????if?(!(fs.exists(fromPath)))?{??????????????????????????????????LOG.info("No?such?destination?"?+?fromPath);?????????????????????????????return;?????????????????????????}???????????????????????????????????????????????????if?(fs.exists(toPath))?{??????????????????????????????????LOG.info("Already?exists!?"?+?toPath);?????????????????????????????return;?????????????????????????}???????????????????????????????????????????????????try{??????????????????????????????????boolean?isRenamed?=?fs.rename(fromPath,toPath);???????????????????????????????????????if(isRenamed){????????????????????????????????????????????LOG.info("Renamed?from?"?+?fromthis?+?"?to?"?+?tothis);??????????????????????????????????}?????????????????????????}catch(Exception?e){??????????????????????????????????LOG.error("renameFile?Exception?caught!?:"?,?e);??????????????????????????????????new?RuntimeException(e);?????????????????????????}finally{??????????????????????????????????fs.close();?????????????????????????}????????????????}catch?(IOException?e1)?{?????????????????????????LOG.error("fs?Exception?caught!?:"?,?e1);?????????????????????????new?RuntimeException(e1);????????????????}??????}??
?
4 刪除
在hdfs上,刪除指定的一個文件。Java代碼:
[java] view plain
copy print?
public?void?deleteFile(String?file)??{??????????????????????????????Configurationconf?=?new?Configuration();??????????????????????????????FileSystemfs;??????????????????????????????try?{???????????????????????????????????????fs=?FileSystem.get(conf);?????????????????????????????????????????????????Pathpath?=?new?Path(file);???????????????????????????????????????if?(!fs.exists(path))?{?????????????????????????????????????????????????LOG.info("File?"?+?file?+?"?does?not?exists");?????????????????????????????????????????????????return;???????????????????????????????????????}???????????????????????????????????????????????????????????????????????????????????fs.delete(new?Path(file),?true);???????????????????????????????????????fs.close();??????????????????????????????}catch?(IOException?e)?{???????????????????????????????????????LOG.error("deleteFile?Exception?caught!?:"?,?e);???????????????????????????????????????new?RuntimeException(e);??????????????????????????????}????????????????????????}??
?
Appendix 完整代碼
?
[java] view plain
copy print?
import?java.io.BufferedInputStream;??import?java.io.BufferedOutputStream;??import?java.io.File;??import?java.io.FileInputStream;??import?java.io.FileOutputStream;??import?java.io.IOException;??import?java.io.InputStream;??import?java.io.OutputStream;?????import?org.apache.commons.logging.Log;??import?org.apache.commons.logging.LogFactory;??import?org.apache.hadoop.conf.Configuration;??import?org.apache.hadoop.fs.BlockLocation;??import?org.apache.hadoop.fs.FSDataInputStream;??import?org.apache.hadoop.fs.FSDataOutputStream;??import?org.apache.hadoop.fs.FileStatus;??import?org.apache.hadoop.fs.FileSystem;??import?org.apache.hadoop.fs.LocatedFileStatus;??import?org.apache.hadoop.fs.Path;??import?org.apache.hadoop.fs.RemoteIterator;??importorg.apache.hadoop.hdfs.DistributedFileSystem;??import?org.apache.hadoop.hdfs.protocol.DatanodeInfo;?????public?class?SyncDFS?{?????????????????????private?static?final?Log?LOG?=?LogFactory.getLog(SyncDFS.class);???????????????????????????????????public?void?list(String?srcPath)?{??????????????????????Configuration?conf?=?new?Configuration();??????????????????????LOG.info("[Defaultfs]?:"?+conf.get("fs.default.name"));????????????????????????FileSystem?fs;?????????????????????try?{??????????????????????????????fs=?FileSystem.get(conf);??????????????????????????????RemoteIterator<LocatedFileStatus>rmIterator?=?fs.listLocatedStatus(new?Path(srcPath));??????????????????????????????while?(rmIterator.hasNext())?{????????????????????????????????????????Path?path?=?rmIterator.next().getPath();????????????????????????????????????????if(fs.isDirectory(path)){??????????????????????????????????????????????????LOG.info("-----------DirectoryName:?"+path.getName());????????????????????????????????????????}????????????????????????????????????????else?if(fs.isFile(path)){??????????????????????????????????????????????????LOG.info("-----------FileName:?"+path.getName());????????????????????????????????????????}??????????????????????????????}?????????????????????}catch?(IOException?e)?{??????????????????????????????LOG.error("list?fileSysetm?object?stream.:"?,?e);??????????????????????????????new?RuntimeException(e);?????????????????????}???????????}????????????????????????????public?void?mkdir(String?dir){?????????????????????Configurationconf?=?new?Configuration();?????????????????????FileSystemfs?=?null;?????????????????????try?{??????????????????????????????fs=?FileSystem.get(conf);??????????????????????????????Pathpath?=?new?Path(dir);??????????????????????????????if(!fs.exists(path)){???????????????????????????????????????fs.mkdirs(path);???????????????????????????????????????LOG.debug("create?directory?'"+dir+"'?successfully!");??????????????????????????????}else{???????????????????????????????????????LOG.debug("directory?'"+dir+"'?exits!");??????????????????????????????}?????????????????????}catch?(IOException?e)?{??????????????????????????????LOG.error("FileSystem?get?configuration?with?anerror");??????????????????????????????e.printStackTrace();?????????????????????}finally{??????????????????????????????if(fs!=?null){???????????????????????????????????????try?{?????????????????????????????????????????????????fs.close();???????????????????????????????????????}catch?(IOException?e)?{?????????????????????????????????????????????????LOG.error("close?fs?object?stream.?:"?,?e);?????????????????????????????????????????????????new?RuntimeException(e);???????????????????????????????????????}??????????????????????????????}?????????????????????}???????????}???????????????????????????????????public?void?readFile(String?file){?????????????????????Configurationconf?=?new?Configuration();?????????????????????FileSystemfs;?????????????????????try?{??????????????????????????????fs=?FileSystem.get(conf);??????????????????????????????Pathpath?=?new?Path(file);??????????????????????????????if(!fs.exists(path)){???????????????????????????????????????LOG.warn("file'"+?file+"'?doesn't?exist!");???????????????????????????????????????return?;??????????????????????????????}??????????????????????????????FSDataInputStreamin?=?fs.open(path);??????????????????????????????Stringfilename?=?file.substring(file.lastIndexOf('/')?+?1,?file.length());??????????????????????????????OutputStreamout?=?new?BufferedOutputStream(new?FileOutputStream(??????????????????????????????????????????????????????????????????????????????new?File(filename)));?????????????????????????????????byte[]?b?=?new?byte[1024];??????????????????????????????int?numBytes?=?0;??????????????????????????????while?((numBytes?=?in.read(b))?>?0)?{???????????????????????????????????????out.write(b,0,?numBytes);??????????????????????????????}??????????????????????????????in.close();??????????????????????????????out.close();??????????????????????????????fs.close();?????????????????????}catch?(IOException?e)?{??????????????????????????????LOG.error("ifExists?fs?Exception?caught!?:"?,?e);??????????????????????????????new?RuntimeException(e);?????????????????????}???????????}?????????????????????public?boolean?ifExists(String?source){?????????????????????if(source?==?null?||?source.length()?==0){??????????????????????????????return?false;?????????????????????}?????????????????????Configurationconf?=?new?Configuration();?????????????????????FileSystemfs?=?null;?????????????????????try?{??????????????????????????????fs=?FileSystem.get(conf);??????????????????????????????LOG.debug("judge?file?'"+source?+??"'");??????????????????????????????return?fs.exists(new?Path(source));?????????????????????}catch?(IOException?e)?{??????????????????????????????LOG.error("ifExists?fs?Exception?caught!?:"?,?e);??????????????????????????????new?RuntimeException(e);??????????????????????????????return?false;?????????????????????}finally{??????????????????????????????if(fs?!=?null){???????????????????????????????????????try?{?????????????????????????????????????????????????fs.close();???????????????????????????????????????}catch?(IOException?e)?{?????????????????????????????????????????????????LOG.error("fs.close?Exception?caught!?:"?,?e);?????????????????????????????????????????????????new?RuntimeException(e);???????????????????????????????????????}??????????????????????????????}??????????????????????????????????????????????????}???????????????????????????????}??????????????????????????????????????public?void?copyFromLocal?(String?source,?String?dest)?{???????????????????????????????????????????Configurationconf?=?new?Configuration();?????????????????????FileSystemfs;?????????????????????try?{??????????????????????????????fs=?FileSystem.get(conf);??????????????????????????????PathsrcPath?=?new?Path(source);?????????????????????????????????????????????????????????????PathdstPath?=?new?Path(dest);????????????????????????????????????????????????????????????if?(!(fs.exists(dstPath)))?{???????????????????????????????????????LOG.warn("dstPathpath?doesn't?exist"?);???????????????????????????????????????LOG.error("No?such?destination?"?+?dstPath);???????????????????????????????????????return;??????????????????????????????}???????????????????????????????????????????????????????????????????????????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());?????????????????????????????????????????????????????????????try{??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????fs.copyFromLocalFile(false,?true,?srcPath,?dstPath);???????????????????????????????????????LOG.info("File?"?+?filename?+?"copied?to?"?+?dest);??????????????????????????????}catch(Exception?e){???????????????????????????????????????LOG.error("copyFromLocalFile?exception?caught!:"?,?e);???????????????????????????????????????new?RuntimeException(e);??????????????????????????????}finally{???????????????????????????????????????fs.close();??????????????????????????????}?????????????????????}catch?(IOException?e1)?{??????????????????????????????LOG.error("copyFromLocal?IOException?objectstream.?:"?,e1);??????????????????????????????new?RuntimeException(e1);?????????????????????}?????????????????????}???????????????????????????????public?void?renameFile?(String?fromthis,?String?tothis){?????????????????????Configurationconf?=?new?Configuration();???????????????????????????????????????????FileSystemfs;?????????????????????try?{??????????????????????????????fs=?FileSystem.get(conf);??????????????????????????????PathfromPath?=?new?Path(fromthis);??????????????????????????????PathtoPath?=?new?Path(tothis);?????????????????????????????????????????????????????????????if?(!(fs.exists(fromPath)))?{???????????????????????????????????????LOG.info("No?such?destination?"?+?fromPath);??????????????????????????????????return;??????????????????????????????}?????????????????????????????????????????????????????????????if?(fs.exists(toPath))?{???????????????????????????????????????LOG.info("Already?exists!?"?+?toPath);??????????????????????????????????return;??????????????????????????????}?????????????????????????????????????????????????????????????try{???????????????????????????????????????boolean?isRenamed?=?fs.rename(fromPath,toPath);????????????????????????????????????????????if(isRenamed){?????????????????????????????????????????????????LOG.info("Renamed?from?"?+?fromthis?+?"?to?"?+?tothis);???????????????????????????????????????}??????????????????????????????}catch(Exception?e){???????????????????????????????????????LOG.error("renameFile?Exception?caught!?:"?,?e);???????????????????????????????????????new?RuntimeException(e);??????????????????????????????}finally{???????????????????????????????????????fs.close();??????????????????????????????}?????????????????????}catch?(IOException?e1)?{??????????????????????????????LOG.error("fs?Exception?caught!?:"?,?e1);??????????????????????????????new?RuntimeException(e1);?????????????????????}???????????}????????????????????????????????????public?void?addFile(String?source,?String?dest)??{????????????????????????????????????????????????????????????Configurationconf?=?new?Configuration();??????????????????????????????FileSystemfs;??????????????????????????????try?{???????????????????????????????????????fs=?FileSystem.get(conf);??????????????????????????????????????????????????????????????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());????????????????????????????????????????????????????????????????????????????????????????????????????????????????????if?(dest.charAt(dest.length()?-?1)?!=?'/')?{?????????????????????????????????????????????????dest=?dest?+?"/"?+?filename;???????????????????????????????????????}else?{?????????????????????????????????????????????????dest=?dest?+?filename;???????????????????????????????????????}????????????????????????????????????????????????????????????????????????????????????????????????????????????????????Pathpath?=?new?Path(dest);???????????????????????????????????????if?(fs.exists(path))?{?????????????????????????????????????????????????LOG.error("File?"?+?dest?+?"?already?exists");?????????????????????????????????????????????????return;???????????????????????????????????????}????????????????????????????????????????????????????????????????????????????????????????????????????????????????????FSDataOutputStreamout?=?fs.create(path);???????????????????????????????????????InputStreamin?=?new?BufferedInputStream(new?FileInputStream(???????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????new?File(source)));?????????????????????????????????????????????????????????????????????????????byte[]?b?=?new?byte[1024];???????????????????????????????????????int?numBytes?=?0;??????????????????????????????????????????????????????????????????????????????while?((numBytes?=?in.read(b))?>?0)?{?????????????????????????????????????????????????out.write(b,0,?numBytes);???????????????????????????????????????}???????????????????????????????????????in.close();???????????????????????????????????????out.close();???????????????????????????????????????fs.close();??????????????????????????????}catch?(IOException?e)?{???????????????????????????????????????LOG.error("addFile?Exception?caught!?:"?,?e);???????????????????????????????????????new?RuntimeException(e);??????????????????????????????}???????????}???????????????????????????????????public?void?deleteFile(String?file)??{??????????????????????????????Configurationconf?=?new?Configuration();??????????????????????????????FileSystemfs;??????????????????????????????try?{???????????????????????????????????????fs=?FileSystem.get(conf);?????????????????????????????????????????????????Pathpath?=?new?Path(file);???????????????????????????????????????if?(!fs.exists(path))?{?????????????????????????????????????????????????LOG.info("File?"?+?file?+?"?does?not?exists");?????????????????????????????????????????????????return;???????????????????????????????????????}???????????????????????????????????????????????????????????????????????????????????fs.delete(new?Path(file),?true);???????????????????????????????????????fs.close();??????????????????????????????}catch?(IOException?e)?{???????????????????????????????????????LOG.error("deleteFile?Exception?caught!?:"?,?e);???????????????????????????????????????new?RuntimeException(e);??????????????????????????????}????????????????????????}??????????????????????????public?void?getModificationTime(String?source)?throws?IOException{???????????????????????????????????????????Configurationconf?=?new?Configuration();???????????????????????????????????????????FileSystemfs?=?FileSystem.get(conf);?????????????????????PathsrcPath?=?new?Path(source);????????????????????????????????????????????????????????????????if?(!(fs.exists(srcPath)))?{?????????????????????System.out.println("No?such?destination?"?+?srcPath);?????????????????????return;?????????????????????}??????????????????????????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());???????????????????????????????????????????FileStatusfileStatus?=?fs.getFileStatus(srcPath);?????????????????????long?modificationTime?=fileStatus.getModificationTime();???????????????????????????????????????????LOG.info("modified?datetime:?"?+?System.out.format("File?%s;?Modification?time?:?%0.2f%n",filename,modificationTime));?????????????????????????????????}????????????????????????????????????public?void?getBlockLocations(String?source)?throws?IOException{?????????????????????Configurationconf?=?new?Configuration();?????????????????????FileSystemfs?=?FileSystem.get(conf);?????????????????????PathsrcPath?=?new?Path(source);????????????????????????????????????????????????????????????????if?(!(ifExists(source)))?{??????????????????????????????System.out.println("No?such?destination?"?+?srcPath);??????????????????????????????return;?????????????????????}??????????????????????????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());???????????????????????????????????????????FileStatusfileStatus?=?fs.getFileStatus(srcPath);???????????????????????????????????????????BlockLocation[]blkLocations?=?fs.getFileBlockLocations(fileStatus,?0,?fileStatus.getLen());?????????????????????int?blkCount?=?blkLocations.length;???????????????????????????????????????????System.out.println("File?:"?+?filename?+?"stored?at:");?????????????????????for?(int?i=0;?i?<?blkCount;?i++)?{??????????????????????????????String[]hosts?=?blkLocations[i].getHosts();??????????????????????????????LOG.info("host?ip:"?+System.out.format("Host?%d:?%s?%n",?i,?hosts));?????????????????????}???????????}?????????????????????public?void?getHostnames?()?throws?IOException{?????????????????????Configurationconfig?=?new?Configuration();?????????????????????????????????????????FileSystemfs?=?FileSystem.get(config);?????????????????????DistributedFileSystemhdfs?=?(DistributedFileSystem)?fs;?????????????????????DatanodeInfo[]dataNodeStats?=?hdfs.getDataNodeStats();???????????????????????????????????????????String[]names?=?new?String[dataNodeStats.length];?????????????????????for?(int?i?=?0;?i?<?dataNodeStats.length;?i++)?{??????????????????????????????names[i]=?dataNodeStats[i].getHostName();??????????????????????????????LOG.info("datenode?hostname:"+(dataNodeStats[i].getHostName()));?????????????????????}???????????}????????????????????????public?static?void?main(String[]?args)?{?????????????????????SyncDFSdfs?=?new?SyncDFS();?????????????????????dfs.list("/user/app");?????????????????????????????????????????dfs.mkdir("/user/app");???????????????????????????????????????????LOG.info("--------------"?+???????????????????????????????dfs.ifExists("/user/warehouse/hbase.db/u_data/u.data"));??????????????????????LOG.info("--------------"?+?dfs.ifExists("/user/app/README.txt"));????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????}???????????????}?
總結
以上是生活随笔為你收集整理的java实现对HDFS增删改查(CRUD)等操作的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。