生活随笔
收集整理的這篇文章主要介紹了
                                
玩Java并发
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.                        
 
                                
                            
                            
                              最近,我需要將一些文件(每個文件都有JSON格式的對象列表(數組))轉換為每個文件都具有相同數據(對象)的分隔行的文件。 這是一次性的任務,很簡單。 我使用Java nio的某些功能進行了讀寫。 我以最簡單的方式使用了GSON。 一個線程在文件上運行,轉換和寫入。 整個操作在幾秒鐘內完成。 但是,我想并發一點。 因此,我增強了可同時工作的工具。 
 
 線程數
  可運行以讀取文件。 
  閱讀器線程將提交到ExecutorService。 輸出是對象列表(示例中為User),將被放入BlockingQueue。 
  可運行以寫入文件。 
  每個可運行對象將從阻塞隊列中輪詢。 它將數據行寫入文件。 我沒有將編寫器Runnable添加到ExecutorService,而是僅使用它啟動了一個線程。 Runnable具有while(some boolen is true) {...}模式。 有關以下內容的更多信息... 
 同步一切
  BlockingQueue是這兩種線程的接口。 由于writer的runnable在while循環(消費者)中運行,我希望能夠使其停止,以便該工具終止。 因此,我為此使用了兩個對象: 
  信號 
  讀取輸入文件的循環會增加一個計數器。 完成遍歷輸入文件并提交編寫器后,我在主線程中初始化了一個信號燈: semaphore.acquire(numberOfFiles); 
  在每個可運行的閱讀器中,我釋放了信號量: semaphore.release(); 
  原子布爾 
  作者的while循環使用AtomicBoolean。 只要AtomicBoolean == true,編寫器將繼續。 在主線程中,在獲取信號量之后,我將AtomicBoolean設置為false。 這使編寫器線程可以終止。 
 使用Java NIO
  為了掃描,讀取和寫入文件系統,我使用了Java NIO的某些功能。 
  掃描: Files.newDirectoryStream(inputFilesDirectory, "*.json"); 
 開始之前刪除輸出目錄: Files.walkFileTree... 
 BufferedReader和BufferedWriter: Files.newBufferedReader(filePath); Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset()); 
  一注。 為了生成此示例的隨機文件,我使用了apache commons lang: RandomStringUtils.randomAlphabetic 
 GitHub中的所有代碼。 
 public class JsonArrayToJsonLines {private final static Path inputFilesDirectory = Paths.get("src\\main\\resources\\files");private final static Path outputDirectory = Paths.get("src\\main\\resources\\files\\output");private final static Gson gson = new Gson();private final BlockingQueue<EntitiesData> entitiesQueue = new LinkedBlockingQueue<>();private AtomicBoolean stillWorking = new AtomicBoolean(true);private Semaphore semaphore = new Semaphore(0);int numberOfFiles = 0;private JsonArrayToJsonLines() {}public static void main(String[] args) throws IOException, InterruptedException {new JsonArrayToJsonLines().process();}private void process() throws IOException, InterruptedException {deleteFilesInOutputDir();final ExecutorService executorService = createExecutorService();DirectoryStream<Path> directoryStream = Files.newDirectoryStream(inputFilesDirectory, "*.json");for (int i = 0; i < 2; i++) {new Thread(new JsonElementsFileWriter(stillWorking, semaphore, entitiesQueue)).start();}directoryStream.forEach(new Consumer<Path>() {@Overridepublic void accept(Path filePath) {numberOfFiles++;executorService.submit(new OriginalFileReader(filePath, entitiesQueue));}});semaphore.acquire(numberOfFiles);stillWorking.set(false);shutDownExecutor(executorService);}private void deleteFilesInOutputDir() throws IOException {Files.walkFileTree(outputDirectory, new SimpleFileVisitor<Path>() {@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {Files.delete(file);return FileVisitResult.CONTINUE;}});}private ExecutorService createExecutorService() {int numberOfCpus = Runtime.getRuntime().availableProcessors();return Executors.newFixedThreadPool(numberOfCpus);}private void shutDownExecutor(final ExecutorService executorService) {executorService.shutdown();try {if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {executorService.shutdownNow();}if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}private static final class OriginalFileReader implements Runnable {private final Path filePath;private final BlockingQueue<EntitiesData> entitiesQueue;private OriginalFileReader(Path filePath, BlockingQueue<EntitiesData> entitiesQueue) {this.filePath = filePath;this.entitiesQueue = entitiesQueue;}@Overridepublic void run() {Path fileName = filePath.getFileName();try {BufferedReader br = Files.newBufferedReader(filePath);User[] entities = gson.fromJson(br, User[].class);System.out.println("---> " + fileName);entitiesQueue.put(new EntitiesData(fileName.toString(), entities));} catch (IOException | InterruptedException e) {throw new RuntimeException(filePath.toString(), e);}}}private static final class JsonElementsFileWriter implements Runnable {private final BlockingQueue<EntitiesData> entitiesQueue;private final AtomicBoolean stillWorking;private final Semaphore semaphore;private JsonElementsFileWriter(AtomicBoolean stillWorking, Semaphore semaphore,BlockingQueue<EntitiesData> entitiesQueue) {this.stillWorking = stillWorking;this.semaphore = semaphore;this.entitiesQueue = entitiesQueue;}@Overridepublic void run() {while (stillWorking.get()) {try {EntitiesData data = entitiesQueue.poll(100, TimeUnit.MILLISECONDS);if (data != null) {try {String fileOutput = outputDirectory.toString() + File.separator + data.fileName;Path fileOutputPath = Paths.get(fileOutput);BufferedWriter writer = Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());for (User user : data.entities) {writer.append(gson.toJson(user));writer.newLine();}writer.flush();System.out.println("=======================================>>>>> " + data.fileName);} catch (IOException e) {throw new RuntimeException(data.fileName, e);} finally {semaphore.release();}}} catch (InterruptedException e1) {}}}}private static final class EntitiesData {private final String fileName;private final User[] entities;private EntitiesData(String fileName, User[] entities) {this.fileName = fileName;this.entities = entities;}}
}
翻譯自: https://www.javacodegeeks.com/2014/12/playing-with-java-concurrency.html
                            總結
                            
                                以上是生活随笔為你收集整理的玩Java并发的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                            
                                如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。