模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)
本案例要實現的目標:
1、模擬修改配置,通過發(fā)指令的方式統(tǒng)計一個文件中出現的單詞的字數。
案例代碼結構如下:
在整個案例中需要有以下幾類文件:
A:worker服務端,用于類似Mapreduce接收jar,接收配置文件,執(zhí)行業(yè)務邏輯
B:程序客戶端、用于組裝配置文件、發(fā)送業(yè)務執(zhí)行的命令(聽過socket發(fā)送jarfile、jobconf、和job2run的命令)
代碼結構,每個包和代碼作用介紹
cn.toto.bigdata.mymr.task | TaskProcessor | 核心的主體執(zhí)行程序 |
? | ProcessLogic | 定義客戶端調用必須實現的方法,相當于WebService中的接口規(guī)范 |
cn.toto.bigdata.mymr.io | InputFormat | 封裝讀文件的組件(接口用途) |
? | DefaultInputFormat | 封裝讀文件的組件的實現類 |
? | OutPutFormat | 封裝寫文件的組件(接口用途) |
? | DefaultOutPutFormat | 封裝寫文件的組件的實現 |
cn.toto.bigdata.mymr.common | Constants | 常量定義 |
? | Context | 應用上下文,用于存儲計算單詞出現字數的次數的中間變量 |
cn.toto.bigdata.mymr.userapp | UserLogic | 客戶端對ProcessLogic規(guī)范的實現 |
? | UserApp | 客戶端主入口程序 |
cn.toto.bigdata.mymr.scheduler | Runner | 客戶端UserApp執(zhí)行命令是依賴的Runner類,通過這里面的Socket發(fā)送命令。 |
? | WorkerClient | 客戶端執(zhí)行時需要用到的client相關的代碼 |
? | WorkerServer | UserApp執(zhí)行時,需要提前啟動的服務端 |
? | WorkerRunnable | 服務端執(zhí)行的相關邏輯 |
?
運行條件:
1、將mapreduce-my-demo導出成test.jar放置在E:/test.jar下。 |
2、需要有用于統(tǒng)計用的文本文件a.txt,文件在E:\a.txt 內容截圖類似: 假設a.txt內容為: The true nobility is in being superior to your previous self guess No great discovery was ever made without a bold Knowledge will give you power but character respect The sun is just rising in the morning of another day I I figure life is a gift and I don’t intend on wasting |
3、首先運行:WorkerServer,相當于是啟動服務端的代碼 |
4、再次運行:UserApp,相當于是客戶端 |
5、最終的統(tǒng)計結果將顯示在:E:/out.txt中。統(tǒng)計結果如下: nobility???? 1 but?? 1 gift?? 1 wasting??? 1 rising??????? 1 don't???????? 1 another??? 1 I??????? 3 your 1 Knowledge?????? 1 sun?? 1 without??? 1 life??? 1 The? 2 character 1 and? 1 of????? 1 power?????? 1 just? 1 day?? 1 you?? 1 on???? 1 No??? 1 a?????? 2 give? 1 figure??????? 1 previous?? 1 in????? 2 will?? 1 made??????? 1 was? 1 is????? 3 being??????? 1 bold 1 great??????? 1 respect??? 1 morning?? 1 the?? 1 ever 1 superior?? 1 guess??????? 1 discovery 1 true 1 self?? 1 to???? 1 intend?????? 1 ? |
6、最終的日志將存儲在:E:/task/task.log,最終的配置和工作用的jar也將會生成到這個目錄下面,效果如下: 其中job.conf的內容為: 生成的task.log效果如下: |
?
接著:具體的代碼實現如下:
cn.toto.bigdata.mymr.task | TaskProcessor | 核心的主體執(zhí)行程序 |
? | ProcessLogic | 定義客戶端調用必須實現的方法,相當于WebService中的接口規(guī)范 |
TaskProcessor代碼如下 package cn.toto.bigdata.mymr.task; ? import java.util.HashMap; import java.util.logging.FileHandler; import java.util.logging.Level; import java.util.logging.Logger; ? import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.common.Context; import cn.toto.bigdata.mymr.io.InputFormat; import cn.toto.bigdata.mymr.io.OutPutFormat; ? /** ?* 1、核心的主體執(zhí)行程序 ?* 這里是任務執(zhí)行者 ?*/ public class TaskProcessor { ? ?? public static void main(String[] args) throws Exception { ????? // 加載用戶指定的所有配置參數到上下文對象中,同時讀取配置文件 ????? Context context = new Context(); ????? //獲取上下文中的配置文件 ????? HashMap<String, String>? conf = context.getConfiguration(); ????? ????? //通過打印日志的方式查看程序運行的結果 ????? Logger logger = Logger.getLogger("TaskProcessor"); ????? //設置日志的輸出級別是INFO級別 ????? logger.setLevel(Level.INFO); ????? FileHandler fileHandler = new FileHandler("E:/task/task.log"); ????? fileHandler.setLevel(Level.INFO); ????? logger.addHandler(fileHandler); ????? logger.info("context:" + context); ????? logger.info("conf:" + conf); ????? ????? //初始化文件讀取組件 ????? //從配置文件中獲取用于讀取的組件的class信息 ????? Class<?> forName = Class.forName(conf.get(Constants.INPUT_FORMAT)); ????? InputFormat inputFormat = (InputFormat) forName.newInstance(); ????? inputFormat.init(context); ?? ?? ????? //用inputFormat組件讀數據,并調用用戶邏輯 ????? Class<?> forName2 = Class.forName(conf.get(Constants.USER_LOGIC)); ????? ProcessLogic userLogic = (ProcessLogic) forName2.newInstance(); ????? //對每一行調用用戶邏輯,并通過context將用戶調用結果存儲內部緩存 ????? while(inputFormat.hasNext()) { ???????? Integer key = inputFormat.nextKey(); ???????? String value = inputFormat.nextValue(); ???????? userLogic.process(key, value, context); ????? } ????? userLogic.cleanUp(context); ????? ????? //替用戶輸出結果 ????? Class<?> forName3 = Class.forName(conf.get(Constants.OUTPUT_FORMAT)); ????? OutPutFormat outputFormat = (OutPutFormat) forName3.newInstance(); ????? outputFormat.write(context); ?? } } ? ? ProcessLogic代碼如下: package cn.toto.bigdata.mymr.task; ? import cn.toto.bigdata.mymr.common.Context; ? /** ?* 1、規(guī)定的業(yè)務邏輯編寫規(guī)范 ?* process() 和? cleanUp都沒有寫實現,這里的實現在客戶端 ?*/ public abstract class ProcessLogic { ? ?? /** ?? ?* 這里的context存儲處理后的結果值 ?? ?* @param key????????? :行號 ?? ?* @param value??????? :所在行的一行內容 ?? ?* @param context????? :應用上下文的內容 ?? ?*/ ?? public abstract void process(Integer key,String value,Context context); ?? ?? /** ?? ?* 通過CleanUp輸出處理后的結果 ?? ?*/ ?? public void cleanUp(Context context){} } ? | ||
cn.toto.bigdata.mymr.io | InputFormat | 封裝讀文件的組件(接口用途) |
? | DefaultInputFormat | 封裝讀文件的組件的實現類 |
? | OutPutFormat | 封裝寫文件的組件(接口用途) |
? | DefaultOutPutFormat | 封裝寫文件的組件的實現 |
package cn.toto.bigdata.mymr.io; ? import cn.toto.bigdata.mymr.common.Context; ? public abstract class InputFormat { ??? ?? /** ?? ?* 獲取下一行要讀的行的位置 ?? ?*/ ?? public abstract int nextKey(); ? ?? /** ?? ?* 獲取從文件中讀取的到的行的信息 ?? ?*/ ?? public abstract String nextValue(); ? ?? /** ?? ?* 從文件中讀取到一行信息 ?? ?*/ ?? public abstract String readLine() throws Exception; ?? ?? /** ?? ?* 判斷是否還可以讀取到下一行的內容 ?? ?*/ ?? public abstract boolean hasNext() throws Exception; ?? ?? /** ?? ?* 初始化要讀取的文件的路徑和文件流 ?? ?*/ ?? public abstract void init(Context context) throws Exception; } | ||
package cn.toto.bigdata.mymr.io; ? import java.io.BufferedReader; import java.io.FileReader; ? import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.common.Context; ? /** ?* 這里是默認的讀取的實現類 ?*/ public class DefaultInputFormat extends InputFormat{ ?? //這里表示要讀取的文件的路徑 ?? private String inputPath; ?? private BufferedReader br = null; ?? //這里的key是指文本中類似讀取到的指針的偏移量,是行號的偏移量 ?? private int key; ?? //這里的value是指一行中的數據 ?? private String value; ?? //默認讀取的行是第0行 ?? private int lineNumber = 0; ?? ?? @Override ?? public void init(Context context) throws Exception { ????? //獲取要讀的文件的路徑 ?? ??? this.inputPath = context.getConfiguration().get(Constants.INPUT_PATH); ?? ??? //開始初始化輸入流,只不過,這個流是從文件中獲取的 ?? ??? this.br = new BufferedReader(new FileReader(inputPath)); ?? } ?? ?? @Override ?? public int nextKey() { ????? return this.key; ?? } ? ?? @Override ?? public String nextValue() { ????? return this.value; ?? } ?? ?? @Override ?? public boolean hasNext() throws Exception { ????? String line = null; ????? line = readLine(); ????? ????? //數據讀取完成之后行號加一 ????? this.key = lineNumber++; ????? this.value = line; ????? ????? return null != line; ?? } ? ?? /** ?? ?* 讀取一行數據 ?? ?*/ ?? @Override ?? public String readLine() throws Exception { ????? String line = br.readLine(); ????? //如果讀取到空了之后,將BufferedReader的值變成空 ????? if (line == null) { ???????? br.close(); ????? } ????? return line; ?? } } | ||
package cn.toto.bigdata.mymr.io; ? import cn.toto.bigdata.mymr.common.Context; ? /** ?* 用于輸出結果的類 ?*/ public abstract class OutPutFormat { ? ?? /** ?? ?* 將結果寫入文件中 ?? ?*/ ?? public abstract void write(Context context) throws Exception; ? ?? /** ?? ?* 關閉流 ?? ?*/ ??? public abstract void cleanUp() throws Exception; } ? | ||
package cn.toto.bigdata.mymr.io; ? import java.io.BufferedWriter; import java.io.FileWriter; import java.util.HashMap; import java.util.Set; import java.util.Map.Entry; ? import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.common.Context; ? public class DefaultOutPutFormat extends OutPutFormat{ ??? BufferedWriter bw = null; ???????? ???????? @Override ???????? public void write(Context context) throws Exception { ???????? ??? String outputPath = context.getConfiguration().get(Constants.OUTPUT_PATH); ???????? ??? HashMap<String, Integer> KVBuffer = context.getKVBuffer(); ???????? ??? this.bw = new BufferedWriter(new FileWriter(outputPath)); ???????? ??? Set<Entry<String, Integer>> entrySet = KVBuffer.entrySet(); ???????? ??? for (Entry<String, Integer> entry : entrySet) { ??????????????????????????? bw.write(entry.getKey() + "\t" + entry.getValue() + "\r"); ?????????????????? } ???????? ??? bw.flush(); ???????? } ? ???????? @Override ???????? public void cleanUp() throws Exception { ?????????????????? bw.close(); ???????? } ? } | ||
cn.toto.bigdata.mymr.common | Constants | 常量定義 |
? | Context | 應用上下文,用于存儲計算單詞出現字數的次數的中間變量 |
package cn.toto.bigdata.mymr.common; ? public class Constants { ? ?? public static final String JAR_PATH = "jar.path"; ?? ?? public static final String JAR_FILE = "job.jar"; ?? ?? public static final String WORKER_HOST = "worker.host"; ?? ?? public static final String WORKER_PORT = "worker.port"; ?? ?? public static final String CONF_FILE = "job.conf"; ?? ?? public static final String INPUT_FORMAT = "input.format.class"; ?? ?? public static final String OUTPUT_FORMAT = "output.format.class"; ?? ?? public static final String INPUT_PATH = "input.path"; ?? ?? public static final String OUTPUT_PATH = "output.path"; ?? ?? public static final String TASK_PROCESSOR = "cn.toto.bigdata.mymr.task.TaskProcessor"; ? ?? public static final String USER_LOGIC = "user.logic.class"; ?? ?? public static final String TASK_WORK_DIR = "E:/task"; ?? ?? } | ||
package cn.toto.bigdata.mymr.common; ? import java.io.File; import java.io.FileInputStream; import java.io.ObjectInputStream; import java.util.HashMap; ? /** ?* 應用上下文,通過這個內容獲取配置文件 ?* 通過這個上下文最終輸出結果 ?*/ public class Context { ???????? private HashMap<String, Integer> KVBuffer = new HashMap<String, Integer>(); ???????? private HashMap<String, String> conf; ???????? ???????? @SuppressWarnings("unchecked") ???????? public Context() throws Exception { ?????????????????? //加載配置參數 ?????????????????? File file = new File(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE); ?????????????????? if (file.exists()) { ??????????????????????????? @SuppressWarnings("resource") ??????????????????????????? ObjectInputStream oi = new ObjectInputStream(new FileInputStream(file)); ??????????????????????????? this.conf = (HashMap<String, String>) oi.readObject(); ?????????????????? } else { ??????????????????????????? // throw new RuntimeException("read conf failed ...."); ?????????????????? } ???????? } ???????? ???????? /** ???????? ?* 通過這種變量最后輸出結果 ???????? ?*/ ???????? public void write(String k, Integer v) { ?????????????????? KVBuffer.put(k, v); ???????? } ? ???????? public HashMap<String, Integer> getKVBuffer() { ?????????????????? return KVBuffer; ???????? } ? ???????? public void setKVBuffer(HashMap<String, Integer> tmpKV) { ?????????????????? this.KVBuffer = tmpKV; ???????? } ? ???????? /** ???????? ?* 獲取配置文件中的信息 ???????? ?*/ ???????? public HashMap<String, String> getConfiguration() { ?????????????????? return conf; ???????? } ? ???????? /** ???????? ?* 在Context()構造函數里面已經有了conf的配置,這里再次傳入說明配置可以讓用戶手動指定 ???????? ?*/ ???????? public void setConfiguration(HashMap<String, String> configuration) { ?????????????????? this.conf = configuration; ???????? } } | ||
cn.toto.bigdata.mymr.userapp | UserLogic | 客戶端對ProcessLogic規(guī)范的實現 |
? | UserApp | 客戶端主入口程序 |
package cn.toto.bigdata.mymr.userapp; ? import java.util.HashMap; import java.util.Set; import java.util.Map.Entry; ? import cn.toto.bigdata.mymr.common.Context; import cn.toto.bigdata.mymr.task.ProcessLogic; ? public class UserLogic extends ProcessLogic { ? ???????? private HashMap<String, Integer> wordCount = new HashMap<String, Integer>(); ???????? ???????? @Override ???????? public void process(Integer key, String value, Context context) { ?????????????????? String [] words = value.split(" "); ?????????????????? for(String word : words) { ??????????????????????????? Integer count = wordCount.get(word); ??????????????????????????? if (count == null) { ???????????????????????????????????? wordCount.put(word, 1); ??????????????????????????? } else { ???????????????????????????????????? wordCount.put(word, count + 1); ??????????????????????????? } ?????????????????? } ???????? } ? ???????? public void cleanUp(Context context) { ?????????????????? Set<Entry<String, Integer>> entrySet = wordCount.entrySet(); ?????????????????? for(Entry<String, Integer> entry : entrySet) { ??????????????????????????? context.write(entry.getKey(), entry.getValue()); ?????????????????? } ???????? } } | ||
package cn.toto.bigdata.mymr.userapp; ? import java.util.HashMap; ? import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.scheduler.Runner; ? public class UserApp { ??? ???????? public static void main(String[] args) throws Exception { ?????????????????? HashMap<String, String> conf = new HashMap<String,String>(); ?????????????????? conf.put(Constants.INPUT_PATH, "E:/a.txt"); ?????????????????? conf.put(Constants.OUTPUT_PATH, "E:/out.txt"); ?????????????????? conf.put(Constants.INPUT_FORMAT, "cn.toto.bigdata.mymr.io.DefaultInputFormat"); ?????????????????? conf.put(Constants.OUTPUT_FORMAT, "cn.toto.bigdata.mymr.io.DefaultOutPutFormat"); ?????????????????? conf.put(Constants.JAR_PATH, "E:/test.jar"); ?????????????????? conf.put(Constants.WORKER_HOST, "localhost"); ?????????????????? conf.put(Constants.WORKER_PORT, "9889"); ?????????????????? conf.put(Constants.USER_LOGIC, "cn.toto.bigdata.mymr.userapp.UserLogic"); ?????????????????? ?????????????????? Runner runner = new Runner(conf); ?????????????????? runner.submit("localhost", 9889); ???????? } } | ||
cn.toto.bigdata.mymr.scheduler | Runner | 客戶端UserApp執(zhí)行命令是依賴的Runner類,通過這里面的Socket發(fā)送命令。 |
? | WorkerClient | 客戶端執(zhí)行時需要用到的client相關的代碼 |
? | WorkerServer | UserApp執(zhí)行時,需要提前啟動的服務端 |
? | WorkerRunnable | 服務端執(zhí)行的相關邏輯 |
package cn.toto.bigdata.mymr.scheduler; ? import java.io.FileOutputStream; import java.io.ObjectOutputStream; import java.util.HashMap; ? import cn.toto.bigdata.mymr.common.Constants; ? public class Runner { ??? private HashMap<String, String> conf; ???????? ??? public Runner(HashMap<String, String> conf) { ??? ???????? this.conf = conf; ??? } ??? ??? public void submit(String host,int port) throws Exception { ??? ???????? ObjectOutputStream jobConfStream = new ObjectOutputStream(new FileOutputStream(Constants.CONF_FILE)); ?????????????????? jobConfStream.writeObject(conf); ?????????????????? ?????????????????? WorkerClient workerClient = new WorkerClient(conf); ?????????????????? workerClient.submit(); ??? } } | ||
package cn.toto.bigdata.mymr.scheduler; ? import java.io.FileInputStream; import java.io.OutputStream; import java.net.Socket; import java.util.HashMap; ? import cn.toto.bigdata.mymr.common.Constants; ? public class WorkerClient { ??? ???????? private HashMap<String, String> conf; ???????? Socket socket = null; ???????? OutputStream so = null; ???????? ???????? public WorkerClient(HashMap<String, String> conf) { ?????????????????? this.conf = conf; ???????? } ???????? ???????? public void submit() throws Exception { ?????????????????? socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT))); ?????????????????? so = socket.getOutputStream(); ? ?????????????????? String jarPath = conf.get(Constants.JAR_PATH); ? ?????????????????? // 發(fā)送jar包 ?????????????????? byte[] buff = new byte[4096]; ?????????????????? FileInputStream jarIns = new FileInputStream(jarPath); ?????????????????? so.write("jarfile".getBytes()); ?????????????????? int read = 0; ?????????????????? while ((read=jarIns.read(buff)) != -1) { ??????????????????????????? so.write(buff,0,read); ?????????????????? } ?????????????????? jarIns.close(); ?????????????????? so.close(); ?????????????????? socket.close(); ?????????????????? ?????????????????? // 發(fā)送job.conf文件 ?????????????????? socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT))); ?????????????????? so = socket.getOutputStream(); ?????????????????? ?????????????????? FileInputStream confIns = new FileInputStream(Constants.CONF_FILE); ?????????????????? so.write("jobconf".getBytes()); ?????????????????? while ((read = confIns.read(buff)) != -1) { ??????????????????????????? so.write(buff,0,read); ?????????????????? } ?????????????????? confIns.close(); ?????????????????? so.close(); ?????????????????? socket.close(); ? ?????????????????? // 發(fā)送啟動命令 ?????????????????? socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT))); ?????????????????? so = socket.getOutputStream(); ?????????????????? so.write("job2run".getBytes()); ?????????????????? String shell = "java -cp E:/test.jar cn.toto.bigdata.mymr.task.TaskProcessor"; ?????????????????? so.write(shell.getBytes()); ?????????????????? so.close(); ?????????????????? socket.close(); ???????? } } | ||
package cn.toto.bigdata.mymr.scheduler; ? import java.net.ServerSocket; import java.net.Socket; ? public class WorkerServer { ? ???????? public static void main(String[] args) throws Exception { ?????????????????? ServerSocket ssc = new ServerSocket(9889); ?????????????????? System.out.println("Worker服務器啟動-->9889"); ?????????????????? while (true) { ??????????????????????????? Socket accept = ssc.accept(); ??????????????????????????? new Thread(new WorkerRunnable(accept)).start(); ?????????????????? } ???????? } } | ||
package cn.toto.bigdata.mymr.scheduler; ? import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.net.Socket; ? import cn.toto.bigdata.mymr.common.Constants; ? public class WorkerRunnable implements Runnable { ???????? Socket socket; ???????? InputStream in = null; ???????? volatile long confSize = 0; ???????? volatile long jarSize = 0; ? ???????? public WorkerRunnable(Socket socket) { ?????????????????? this.socket = socket; ???????? } ? ???????? @Override ???????? public void run() { ?????????????????? try { ??????????????????????????? this.in = socket.getInputStream(); ??????????????????????????? byte[] protocal = new byte[7]; ??????????????????????????? int read = in.read(protocal, 0, 7); ??????????????????????????? if (read < 7) { ???????????????????????????????????? System.out.println("客戶端請求不符合協(xié)議規(guī)范......"); ???????????????????????????????????? return; ??????????????????????????? } ??????????????????????????? String command = new String(protocal); ??????????????????????????? switch (command) { ??????????????????????????? case "jarfile": ???????????????????????????????????? receiveJarFile(); ???????????????????????????????????? break; ??????????????????????????? case "jobconf": ???????????????????????????????????? receiveConfFile(); ???????????????????????????????????? break; ??????????????????????????? case "job2run": ???????????????????????????????????? runJob(); ???????????????????????????????????? break; ??????????????????????????? default: ???????????????????????????????????? System.out.println("客戶端請求不符合協(xié)議規(guī)范....."); ???????????????????????????????????? socket.close(); ???????????????????????????????????? break; ??????????????????????????? } ? ?????????????????? } catch (Exception e) { ??????????????????????????? e.printStackTrace(); ?????????????????? } ? ???????? } ? ???????? private void receiveConfFile() throws Exception { ?????????????????? System.out.println("開始接收conf文件"); ?????????????????? FileOutputStream fo = new FileOutputStream(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE); ?????????????????? byte[] buff = new byte[4096]; ?????????????????? int read = 0; ?????????????????? while ((read = in.read(buff)) != -1) { ??????????????????????????? confSize += read; ??????????????????????????? fo.write(buff, 0, read); ?????????????????? } ?????????????????? fo.flush(); ?????????????????? fo.close(); ?????????????????? in.close(); ?????????????????? socket.close(); ? ???????? } ? ???????? private void receiveJarFile() throws Exception { ?????????????????? System.out.println("開始接收jar文件"); ?????????????????? FileOutputStream fo = new FileOutputStream(Constants.TASK_WORK_DIR + "/" + Constants.JAR_FILE); ?????????????????? byte[] buff = new byte[4096]; ?????????????????? int read = 0; ?????????????????? while ((read = in.read(buff)) != -1) { ??????????????????????????? jarSize += read; ??????????????????????????? fo.write(buff, 0, read); ?????????????????? } ?????????????????? fo.flush(); ?????????????????? fo.close(); ?????????????????? in.close(); ?????????????????? socket.close(); ? ???????? } ? ???????? private void runJob() throws Exception { ? ?????????????????? byte[] buff = new byte[4096]; ?????????????????? int read = in.read(buff); ?????????????????? String shell = new String(buff, 0, read); ?????????????????? System.out.println("接收到啟動命令......." + shell); ?????????????????? in.close(); ?????????????????? socket.close(); ?????????????????? Thread.sleep(500); ? ?????????????????? File jarFile = new File(Constants.TASK_WORK_DIR + "/" + Constants.JAR_FILE); ?????????????????? File confFile = new File(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE); ?????????????????? System.out.println("jarfile 存在?" + jarFile.exists()); ?????????????????? System.out.println("confFile 存在?" + confFile.exists()); ?????????????????? System.out.println("jarfile可讀?" + jarFile.canRead()); ?????????????????? System.out.println("jarfile可寫?" + jarFile.canWrite()); ?????????????????? System.out.println("confFile可讀?" + confFile.canRead()); ?????????????????? System.out.println("confFile可寫?" + confFile.canWrite()); ? ?????????????????? System.out.println("jarFile.length():" + jarFile.length()); ?????????????????? System.out.println("confFile.length():" + confFile.length()); ? ?????????????????? /*if (jarFile.length() == jarSize && confFile.length() == confSize) { ??????????????????????????? System.out.println("jar 和 conf 文件已經準備就緒......"); ?????????????????? }*/ ?????????????????? System.out.println("開始啟動數據處理TaskProcessor......"); ? ?????????????????? Process exec = Runtime.getRuntime().exec(shell); ?????????????????? int waitFor = exec.waitFor(); ?????????????????? ?????????????????? InputStream errStream = exec.getErrorStream(); ?????????????????? BufferedReader errReader = new BufferedReader(new InputStreamReader(errStream)); ?????????????????? String inLine = null; ?????????????????? /* ?????????????????? ?* InputStream stdStream = exec.getInputStream(); BufferedReader ?????????????????? ?* stdReader = new BufferedReader(new InputStreamReader(stdStream)); ?????????????????? ?* while ((inLine = stdReader.readLine()) != null) { ?????????????????? ?* System.out.println(inLine); } ?????????????????? ?*/ ?????????????????? while ((inLine = errReader.readLine()) != null) { ??????????????????????????? System.out.println(inLine); ?????????????????? } ?????????????????? ?????????????????? if (waitFor == 0) { ??????????????????????????? System.out.println("task成功運行完畢....."); ?????????????????? } else { ??????????????????????????? System.out.println("task異常退出......"); ?????????????????? } ? ???????? } ? } | ||
?
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 顶分型什么意思
- 下一篇: 支付宝拉黑和删除的区别