In this post, we are going to explore how to use AWS S3 with Asynchronous programming using Spring WebFlux Rest API.

· Prerequisites
· Overview
∘ What is Amazon S3?
∘ Setting Up S3 bucket
· Spring Webflux Application
∘ Configuring S3 clients
∘ Upload Object
∘ Download the file from AWS S3
∘ AWSS3Controller
· Test the REST APIs
· Conclusion
· References

The AWS SDK for Java 1.x has asynchronous clients that are wrappers around a thread pool and blocking synchronous clients that don’t provide the full benefit of nonblocking I/O. The AWS SDK for Java 2.x provides features for non-blocking asynchronous clients that implement high concurrency across a few threads.

In this story, we are going to explore how to use AWS S3 with Asynchronous programming using Spring WebFlux as a Backend application.

Prerequisites

This is the list of all the prerequisites for following this story:

  • Spring Boot 3
  • Maven 3.8.+
  • Java 17
  • An active AWS account with access to the S3 service.
  • Postman / insomnia or any other API testing tool.
  • Optionally, LocalStack to run S3 locally
  • Optionally, Docker and Docker compose

Overview

What is Amazon S3?

Amazon S3 is a simple storage service that helps developers and IT teams to store, backup, archive, and retrieve data from anywhere on the web. It allows administrators to store data in categories, add tags to objects, configure access controls for multiple clients, perform high-volume data analysis, get insights into the storage usage, and measure trends based on activities.

Setting Up S3 bucket

For this story, we chose to run the LocalStack Docker image locally to support local versions of additional AWS services.
For a configuration with the AWS Management Console, you can see my previous story.

LocalStack is a cloud service emulator that runs in a single container on your laptop or in your CI environment. With LocalStack, you can run your AWS applications or Lambdas entirely on your local machine without connecting to a remote cloud provider! Whether you are testing complex CDK applications or Terraform configurations, or just beginning to learn about AWS services, LocalStack helps speed up and simplify your testing and development workflow. — https://github.com/localstack/localstack

There are several ways to install LocalStack (LocalStack CLI, LocalStack Cockpit, Docker, Docker-Compose, Helm). We’ll use Docker-Compose. We will create a docker-compose YAML file containing all the instructions to start the LocalStack Docker container.

version: "3.8"

services:
localstack:
container_name: localstack_main
image: localstack/localstack:latest
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
- DEBUG=1
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=test
- AWS_DEFAULT_REGION=eu-west-1 # This is the region where your localstack mocks to be running
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
- ./aws/init-aws.sh:/etc/localstack/init/ready.d/init-aws.sh

#!/bin/bash
awslocal s3 mb s3://my-test-bucket

Open your CLI and run the following command:

docker-compose up -d

The local s3 service is available on port 4566 with a bucket name “my-test-bucket”.

Spring Webflux Application

Let’s start by creating a simple Spring Reactive project from start.spring.io, with the following dependencies: Spring Reactive Web and Lombok.

Configuring S3 clients

To implement the integration with AWS, we need to add the AWS SDK for Java V2 dependencies in the pom.xml file.

<!-- AWS SDK Java V2  -->
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>s3</artifactId>
    <version>2.18.41</version>
</dependency>
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>netty-nio-client</artifactId>
    <version>2.18.41</version>
</dependency>

The second important step is AWS S3 AsyncClient Creation.

First, add the AWS S3 credentials in the property file.

# AWS properties
aws:
  access-key: test
  secret-key: test
  region: eu-west-1
  s3-bucket-name: my-test-bucket
  multipart-min-part-size: 5242880 # 5MB
  endpoint: http://localhost:4566/
  • aws.region – Aws region.
  • aws.access-key – AWS access key ID
  • aws.secret-key – AWS secret access key
  • aws.endpoint: – Override the S3 client to use a local instance instead of an AWS service.
  • aws.s3-bucket-name – Name of the S3 bucket
  • aws.multipart-min-part-size – AWS S3 requires that file parts must have at least 5MB, except for the last part. — https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html

Then, we need to create the service client for accessing Amazon S3 asynchronously with S3AsyncClient.

@RequiredArgsConstructor
@Configuration
public class AwsS3Config {

    private final AwsProperties s3ConfigProperties;

    @Bean
    public S3AsyncClient s3AsyncClient(AwsCredentialsProvider awsCredentialsProvider) {

        return S3AsyncClient.builder()
                .httpClient(sdkAsyncHttpClient())
                .region(Region.of(s3ConfigProperties.getRegion()))
                .credentialsProvider(awsCredentialsProvider)
                .endpointOverride(URI.create(s3ConfigProperties.getEndpoint()))
                .forcePathStyle(true)
                .serviceConfiguration(s3Configuration()).build();
    }

    private SdkAsyncHttpClient sdkAsyncHttpClient() {
        return NettyNioAsyncHttpClient.builder()
                .writeTimeout(Duration.ZERO)
                .maxConcurrency(64)
                .build();
    }

    private S3Configuration s3Configuration() {
        return S3Configuration.builder()
                .checksumValidationEnabled(false)
                .chunkedEncodingEnabled(true)
                .build();
    }

    @Bean
    AwsCredentialsProvider awsCredentialsProvider() {
        return () -> AwsBasicCredentials.create(s3ConfigProperties.getAccessKey(), s3ConfigProperties.getSecretKey());
    }

}

We created a bean method returning S3AsyncClient with basic configuration. The credentials are loaded from the custom AwsCredentialsProvider bean with our Localstack AWS service.

The methods for an asynchronous client in V2 of the AWS SDK for Java return CompletableFuture objects that allow us to access the response when it’s ready.

Upload Object

We are going to implement the object upload service in S3.

/**
 * {@inheritDoc}
 */
@Override
public Mono<FileResponse> uploadObject(FilePart filePart) {

    String filename = filePart.filename();

    Map<String, String> metadata = Map.of("filename", filename);
    // get media type
    MediaType mediaType = ObjectUtils.defaultIfNull(filePart.headers().getContentType(), MediaType.APPLICATION_OCTET_STREAM);

    CompletableFuture<CreateMultipartUploadResponse> s3AsyncClientMultipartUpload = s3AsyncClient
            .createMultipartUpload(CreateMultipartUploadRequest.builder()
                    .contentType(mediaType.toString())
                    .key(filename)
                    .metadata(metadata)
                    .bucket(s3ConfigProperties.getS3BucketName())
                    .build());

    UploadStatus uploadStatus = new UploadStatus(Objects.requireNonNull(filePart.headers().getContentType()).toString(), filename);

    return Mono.fromFuture(s3AsyncClientMultipartUpload)
            .flatMapMany(response -> {
                FileUtils.checkSdkResponse(response);
                uploadStatus.setUploadId(response.uploadId());
                LOGGER.info("Upload object with ID={}", response.uploadId());
                return filePart.content();
            })
            .bufferUntil(dataBuffer -> {
                // Collect incoming values into multiple List buffers that will be emitted by the resulting Flux each time the given predicate returns true.
                uploadStatus.addBuffered(dataBuffer.readableByteCount());

                if (uploadStatus.getBuffered() >= s3ConfigProperties.getMultipartMinPartSize()) {
                    LOGGER.info("BufferUntil - returning true, bufferedBytes={}, partCounter={}, uploadId={}",
                            uploadStatus.getBuffered(), uploadStatus.getPartCounter(), uploadStatus.getUploadId());

                    // reset buffer
                    uploadStatus.setBuffered(0);
                    return true;
                }

                return false;
            })
            .map(FileUtils::dataBufferToByteBuffer)
            // upload part
            .flatMap(byteBuffer -> uploadPart(uploadStatus, byteBuffer))
            .onBackpressureBuffer()
            .reduce(uploadStatus, (status, completedPart) -> {
                LOGGER.info("Completed: PartNumber={}, etag={}", completedPart.partNumber(), completedPart.eTag());
                (status).getCompletedParts().put(completedPart.partNumber(), completedPart);
                return status;
            })
            .flatMap(uploadStatus1 -> completeMultipartUpload(uploadStatus))
            .map(response -> {
                FileUtils.checkSdkResponse(response);
                LOGGER.info("upload result: {}", response.toString());
                return new FileResponse(filename, uploadStatus.getUploadId(), response.location(), uploadStatus.getContentType(), response.eTag());
            });
}

This method takes as a parameter the part of the request containing the file to save. The Part presents a part in a multipart/form-data request, it could be a FilePart orFormFieldPart.

Then, we prepare the request with createMultipartUpload method. This action initiates a multipart upload and returns an upload ID. This upload ID is used to associate all of the parts in the specific multipart upload using sdk uploadPart method.

/**
 * Uploads a part in a multipart upload.
 */
private Mono<CompletedPart> uploadPartObject(UploadStatus uploadStatus, ByteBuffer buffer) {
    final int partNumber = uploadStatus.getAddedPartCounter();
    LOGGER.info("UploadPart - partNumber={}, contentLength={}", partNumber, buffer.capacity());

    CompletableFuture<UploadPartResponse> uploadPartResponseCompletableFuture = s3AsyncClient.uploadPart(UploadPartRequest.builder()
                    .bucket(s3ConfigProperties.getS3BucketName())
                    .key(uploadStatus.getFileKey())
                    .partNumber(partNumber)
                    .uploadId(uploadStatus.getUploadId())
                    .contentLength((long) buffer.capacity())
                    .build(),
            AsyncRequestBody.fromPublisher(Mono.just(buffer)));

    return Mono
            .fromFuture(uploadPartResponseCompletableFuture)
            .map(uploadPartResult -> {
                FileUtils.checkSdkResponse(uploadPartResult);
                LOGGER.info("UploadPart - complete: part={}, etag={}", partNumber, uploadPartResult.eTag());
                return CompletedPart.builder()
                        .eTag(uploadPartResult.eTag())
                        .partNumber(partNumber)
                        .build();
            });
}

When all ETags are received, we perform a multipart download by assembling the previously downloaded parts with the CompleteMultipartUpload method.

/**
 * This method is called when a part finishes uploading. It's primary function is to verify the ETag of the part
 * we just uploaded.
 */
private Mono<CompleteMultipartUploadResponse> completeMultipartUpload(UploadStatus uploadStatus) {
    LOGGER.info("CompleteUpload - fileKey={}, completedParts.size={}",
            uploadStatus.getFileKey(), uploadStatus.getCompletedParts().size());

    CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
            .parts(uploadStatus.getCompletedParts().values())
            .build();

    return Mono.fromFuture(s3AsyncClient.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
            .bucket(s3ConfigProperties.getS3BucketName())
            .uploadId(uploadStatus.getUploadId())
            .multipartUpload(multipartUpload)
            .key(uploadStatus.getFileKey())
            .build()));
}

Download the file from AWS S3

The AWS SDK provides the getObject() method to retrieve objects from Amazon S3.

@Override
public Mono<byte[]> getByteObject(@NotNull String key) {
    LOGGER.debug("Fetching object as byte array from S3 bucket: {}, key: {}", s3ConfigProperties.getS3BucketName(), key);
    return Mono.just(GetObjectRequest.builder().bucket(s3ConfigProperties.getS3BucketName()).key(key).build())
            .map(it -> s3AsyncClient.getObject(it, AsyncResponseTransformer.toBytes()))
            .flatMap(Mono::fromFuture)
            .map(BytesWrapper::asByteArray);
}

AWSS3Controller

Let’s see all the methods of our controller class.

@RequiredArgsConstructor
@RestController
@RequestMapping("/object")
@Validated
public class AWSS3Controller {

    private final AWSS3FileStorageService fileStorageService;

    @PostMapping("/upload")
    public Mono<SuccessResponse> upload(@RequestPart("file-data") Mono<FilePart> filePart) {
        return filePart
                .map(file -> {
                    FileUtils.filePartValidator(file);
                    return file;
                })
                .flatMap(fileStorageService::uploadObject)
                .map(fileResponse -> new SuccessResponse(fileResponse, "Upload successfully"));
    }

    @GetMapping(path="/{fileKey}")
    public Mono<SuccessResponse> download(@PathVariable("fileKey") String fileKey) {

        return fileStorageService.getByteObject(fileKey)
                .map(objectKey -> new SuccessResponse(objectKey, "Object byte response"));
    }

    @DeleteMapping(path="/{objectKey}")
    public Mono<SuccessResponse> deleteFile(@PathVariable("objectKey") String objectKey) {
        return fileStorageService.deleteObject(objectKey)
                .map(resp -> new SuccessResponse(null, MessageFormat.format("Object with key: {0} deleted successfully", objectKey)));
    }

    @GetMapping
    public Flux<SuccessResponse> getObject() {
        return fileStorageService.getObjects()
                .map(objectKey -> new SuccessResponse(objectKey, "Result found"));
    }
}

Test the REST APIs

  • Upload Object
IntelliJ console log
  • Retrieve objects by fileKey
  • Get some or all (up to 1,000) of the objects in a bucket

Conclusion

In this post, we’ve explored AWS S3 integration using Spring WebFlux as Backend.

The complete source code is available on GitHub.

Happy coding!

References

👉 Link to Medium blog

Related Posts