使用jclouds在S3上分段上传
1.目標
在上一篇文章中 ,我們研究了如何使用jclouds中的通用Blob API將內容上傳到S3。 在本文中,我們將使用jclouds的S3特定的異步API上傳內容并利用S3提供的分段上傳功能。
2.準備
2.1。 設置自定義API
上傳過程的第一部分是創建jclouds API-這是針對Amazon S3的自定義API:
public AWSS3AsyncClient s3AsyncClient() {String identity = ...String credentials = ...BlobStoreContext context = ContextBuilder.newBuilder('aws-s3').credentials(identity, credentials).buildView(BlobStoreContext.class);RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();return providerContext.getAsyncApi(); }2.2。 確定內容的零件數
Amazon S3對于要上傳的每個部分都有5 MB的限制。 因此,我們需要做的第一件事就是確定可以分割內容的適當數量的部分,以使我們沒有低于5 MB限制的部分:
public static int getMaximumNumberOfParts(byte[] byteArray) {int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024if (numberOfParts== 0) {return 1;}return numberOfParts; }2.3。 將內容分成幾部分
將把字節數組分成一定數量的部分:
public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);int fullSize = byteArray.length;long dimensionOfPart = fullSize / maxNumberOfParts;for (int i = 0; i < maxNumberOfParts; i++) {int previousSplitPoint = (int) (dimensionOfPart * i);int splitPoint = (int) (dimensionOfPart * (i + 1));if (i == (maxNumberOfParts - 1)) {splitPoint = fullSize;}byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);parts.add(partBytes);}return parts; }我們將測試將字節數組分成多個部分的邏輯–我們將生成一些字節,將字節數組拆分,使用Guava將其重新組合在一起,并驗證是否可以獲取原始字節:
@Test public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {byte[] byteArray = randomByteData(16);int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length,equalTo(byteArray.length));byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));assertThat(byteArray, equalTo(unmultiplexed)); }要生成數據,我們只需使用Random的支持:
byte[] randomByteData(int mb) {byte[] randomBytes = new byte[mb * 1024 * 1024];new Random().nextBytes(randomBytes);return randomBytes; }2.4。 創建有效載荷
既然我們已經為內容確定了正確的部分數量,并且設法將內容分解為多個部分,那么我們需要為jclouds API 生成Payload對象 :
public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {List<Payload> payloads = Lists.newArrayList();for (byte[] filePart : fileParts) {byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();Payload partPayload = Payloads.newByteArrayPayload(filePart);partPayload.getContentMetadata().setContentLength((long) filePart.length);partPayload.getContentMetadata().setContentMD5(partMd5Bytes);payloads.add(partPayload);}return payloads; }3.上載
上傳過程是一個靈活的多步驟過程–這意味著:
- 可以在擁有所有數據之前開始上載–可以在輸入數據時上載數據
- 數據分塊上傳-如果這些操作之一失敗,則可以簡單地將其檢索
- 塊可以并行上傳–這可以大大提高上傳速度,尤其是在大文件的情況下
3.1。 啟動上傳操作
上傳操作的第一步是啟動該過程 。 對S3的請求必須包含標準的HTTP標頭–特別是內容 – MD5標頭。 我們將在這里使用Guava哈希函數支持:
Hashing.md5().hashBytes(byteArray).asBytes();這是整個字節數組(而不是各個部分)的md5哈希 。
為了啟動上載以及與S3的所有進一步交互,我們將使用AWSS3AsyncClient –我們之前創建的異步API:
ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build(); String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();密鑰是分配給對象的句柄–它必須是客戶端指定的唯一標識符。
還要注意,即使我們使用的是異步版本的API, 我們也阻止了該操作的結果–這是因為我們需要初始化的結果才能繼續前進。
操作的結果是S3返回的上傳ID –這將在整個生命周期中識別上傳,并將出現在所有后續的上傳操作中。
3.2。 上載零件
下一步是上傳零件 。 我們的目標是并行發送這些請求,因為上載零件操作代表了大部分上載過程:
List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList(); for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {ListenableFuture<String> future = s3AsyncApi.uploadPart(container, key, partNumber + 1, uploadId, payloads.get(partNumber));ongoingOperations.add(future); }零件編號必須是連續的,但發送請求的順序無關緊要。
提交所有上傳零件請求后,我們需要等待它們的響應,以便我們可以收集每個零件的單獨ETag值:
Function<ListenableFuture<String>, String> getEtagFromOp =new Function<ListenableFuture<String>, String>() {public String apply(ListenableFuture<String> ongoingOperation) {try {return ongoingOperation.get();} catch (InterruptedException | ExecutionException e) {throw new IllegalStateException(e);}} }; List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);如果由于某種原因,上載部分操作之一失敗, 則可以重試該操作,直到成功為止。 上面的邏輯不包含重試機制,但是建立它應該足夠簡單。
3.3。 完成上傳操作
上傳過程的最后一步是完成分段操作 。 S3 API要求以Map的形式上傳來自先前零件的響應,現在我們可以輕松地從上面獲得的ETag列表中創建這些響應:
Map<Integer, String> parts = Maps.newHashMap(); for (int i = 0; i < etagsOfParts.size(); i++) {parts.put(i + 1, etagsOfParts.get(i)); }最后,發送完整的請求:
s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();這將返回完成對象的最終ETag,并完成整個上傳過程。
4。結論
在本文中,我們使用自定義S3 jclouds API構建了一個支持多部分的,完全并行的S3上傳操作。 此操作可以按原樣使用,但是可以通過幾種方法進行改進 。 首先,應在上傳操作周圍添加重試邏輯,以更好地處理故障。 接下來,對于非常大的文件,即使該機制正在并行發送所有上載的多部分請求, 限制機制仍應限制發送的并行請求的數量。 這既可以避免帶寬成為瓶頸,又可以確保Amazon本身不會將上傳過程標記為超過每秒允許的請求限制– Guava RateLimiter可能非常適合此操作。
翻譯自: https://www.javacodegeeks.com/2013/04/multipart-upload-on-s3-with-jclouds.html
總結
以上是生活随笔為你收集整理的使用jclouds在S3上分段上传的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 主管部门是什么意思 主管部门的含义
- 下一篇: 出口贸易备案登记表(出口贸易备案)