Spring Data Reactive MongoDB aggregation pipeline

· Prerequisites
· Overview
∘ MongoDB Aggregation Framework
∘ How does the MongoDB Aggregation Framework work?
· Aggregation pipeline in Spring Reactive
∘ Project Setup
∘ With @Aggregation Annotation
∘ Aggregation With ReactiveMongoTemplate
· Testing
· Conclusion
· References

Prerequisites

This is the list of all the prerequisites for following this story:

  • Java 17
  • Starter WebFlux 3.1.0
  • Maven 3.6.3
  • Mongo 4.4 installed
  • Postman or Insomnia

Overview

MongoDB Aggregation Framework

The aggregation pipeline is a framework for data aggregation modeled on the concept of data processing pipelines. It’s a way to query documents from MongoDB in a way that breaks down these more confounding queries. It separates complex logic into sequential operations.

Aggregation in MongoDB allows for the transforming of data and results in a more powerful fashion than from using the find() command.

How does the MongoDB Aggregation Framework work?

MongoDB Aggregation Framework builds a pipeline using multiple stages and expressions to perform analytic operations.

In the aggregation Framework, we think of stages instead of commands. Each stage performs an operation on the input documents. For example, we can build an aggregation pipeline that matches a set of documents based on a set of criteria, groups those documents together, sorts them, then returns that result set to us.

Aggregation pipeline in Spring Reactive

Now that we have seen an overview of the MongoDB aggregation Framework, let’s see its application with Spring Reactive

Consider an invoice collection with the following document:

[
  {
    "status": "CANCELED",
    "invoiceDate": ISODate("2023-05-30T00:00:00.000Z"),
    "reference": "VND9IE5F",
    "amount": 1730,
    "customer": {
      "id": "6487547190ab5e1957a6baf4",
      "name": "Melanie"
    }
  },
  {
    "status": "PENDING",
    "invoiceDate": ISODate("2023-02-10T00:00:00.000Z"),
    "reference": "ZW441HFT",
    "amount": 6555,
    "customer": {
      "id": "64875411ec782e4f2bf1d782",
      "name": "Linda C. McKenzie"
    }
  },
  {
    "status": "PENDING",
    "invoiceDate": ISODate("2023-02-10T00:00:00.000Z"),
    "reference": "T1YN5RFO",
    "amount": 2100,
    "customer": {
      "id": "6487542548dbdd4b679adfed",
      "name": "Michelle"
    }
  },
  {
    "status": "PAID",
    "invoiceDate": ISODate("2023-02-10T00:00:00.000Z"),
    "reference": "UCAT5PPO",
    "amount": 21200,
    "customer": {
      "id": "63e6a3007a58c92799c84240",
      "name": "Ali"
    }
  },
  {
    "status": "PENDING",
    "invoiceDate": ISODate("2023-02-10T00:00:00.000Z"),
    "reference": "C4DE4X4P",
    "amount": 15010,
    "customer": {
      "id": "64875443507589915b6f4875",
      "name": "Jose"
    }
  },
  {
    "status": "PENDING",
    "invoiceDate": ISODate("2023-02-10T00:00:00.000Z"),
    "reference": "DZ5J8H2X",
    "amount": 6335,
    "customer": {
      "id": "6487544bb55647ea39e2d925",
      "name": "Jacques N. Richmond"
    }
  },
  {
    "status": "PAID",
    "invoiceDate": ISODate("2023-02-10T00:00:00.000Z"),
    "reference": "34PUY8BW",
    "amount": 18200,
    "customer": {
      "id": "63e6a3007a58c92799c84240",
      "name": "Ali"
    }
  },
  {
    "status": "CANCELED",
    "invoiceDate": ISODate("2023-02-10T00:00:00.000Z"),
    "reference": "FO5TND59",
    "amount": 2300,
    "customer": {
      "id": "64875452afbec6984152b9b5",
      "name": "Calvin"
    }
  },
  {
    "status": "CANCELED",
    "invoiceDate": ISODate("2023-02-10T00:00:00.000Z"),
    "reference": "60EZ6AM1",
    "amount": 6770,
    "customer": {
      "id": "63e6a3007a58c92799c84240",
      "name": "Ali"
    }
  },
  {
    "status": "PAID",
    "invoiceDate": ISODate("2023-06-15T00:00:00.000Z"),
    "reference": "QP83JVAR",
    "amount": 7205,
    "customer": {
      "id": "6487545cb89dc4aabc14d89d",
      "name": "Alan"
    }
  },
  {
    "status": "PAID",
    "invoiceDate": ISODate("2023-06-10T00:00:00.000Z"),
    "reference": "98A60RKN",
    "amount": 13895,
    "customer": {
      "id": "6487546769ea02a11f345973",
      "name": "Kristen"
    }
  }
]

Users want to see a dashboard showing the total amount of invoices by status over a defined period.

So the final result of the UI looks like this:

In the MongoDB instance, the aggregation looks like this:

db.getCollection('invoice').aggregate([
 // Stage 1
  {
    "$match": {
      "invoiceDate": {
        $gte: ISODate("2023-01-01T00:00:00.000Z"),
        $lte: ISODate("2023-06-30T00:00:00.000Z")
      }
    }
  },
  // Stage 2
  {
    $group: {
      _id: null,
      total: {
        $sum: "$amount"
      },
      docs: {
        $push: "$$ROOT"
      }
    }
  },
  // Stage 3
  {
    $project: {
      _id: 0,
      total: 1,
      paid: {
        $filter: {
          input: "$docs",
          as: "doc",
          cond: {
            $eq: [
              "$$doc.status",
              "PAID"
            ]
          }
        }
      },
      canceled: {
        $filter: {
          input: "$docs",
          as: "doc",
          cond: {
            $eq: [
              "$$doc.status",
              "CANCELED"
            ]
          }
        }
      },
      pending: {
        $filter: {
          input: "$docs",
          as: "doc",
          cond: {
            $eq: [
              "$$doc.status",
              "PENDING"
            ]
          }
        }
      },
      
    }
  },
  // Stage 4
  {
    $project: {
      total: 1,
      paid: {
        $sum: "$paid.amount"
      },
      canceled: {
        $sum: "$canceled.amount"
      },
      pending: {
        $sum: "$pending.amount"
      }
    }
  }
])

Output on mongoplayground.net: https://mongoplayground.net/p/QfZM9wHKkw1

Now we are going to build a Rest API to provide this result with Spring Data Reactive MongoDB.

Project Setup

We will start by creating a simple Spring Boot project from start.spring.io, with the following dependencies: Spring Reactive Web, Spring Data Reactive MongoDB, Lombok, and Validation.

Invoice Entity Class

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@EqualsAndHashCode
@Document(collection="invoice")
public class Invoice {

    @Id
    private String id;

    @NotNull
    @Size(min = 1, max = 8)
    private String reference;

    /**
     * When this invoice was generated
     */
    private LocalDateTime invoiceDate;

    private Customer customer;

    private InvoiceStatusEnum status = InvoiceStatusEnum.PENDING;
}

InvoiceCountSummary projection

InvoiceCountSummary is a record that is used to map aggregation results.

public record InvoiceCountSummary(double total, double paid, double canceled, double pending) {}

With @Aggregation Annotation

The @Aggregation annotation can be used to annotate a Repository query method so that it runs the pipeline() on invocation. Pipeline stages are mapped against the Repository domain type to consider field mappings and may contain simple placeholders ?0 as well as SpelExpressions.

@Repository
public interface InvoiceRepository extends ReactiveMongoRepository<Invoice, String> {

    @Aggregation(pipeline = {
            "{$match: {'invoiceDate': {$gte: ?0, $lte: ?1 }}}",
            "{$group: {_id: null,total: {$sum: '$amount'},docs: {$push: '$$ROOT'}}}",
            "{$project: {_id: 0,total: 1, paid: { $filter: { input: '$docs', as: 'doc',cond: {$eq: ['$$doc.status','PAID']}}},canceled: { $filter: { input: '$docs',as: 'doc',cond: {$eq: ['$$doc.status','CANCELED']}}}, pending: { $filter: { input: '$docs',as: 'doc',cond: {$eq: ['$$doc.status','PENDING']}}}}}",
            "{$project: { total: 1, paid: { $sum: '$paid.amount'}, canceled: {$sum: '$canceled.amount'}, pending: {$sum: '$pending.amount'}}}"
    })
    Mono<InvoiceCountSummary> findInvoiceSummary(LocalDate startDate, LocalDate endDate);
}

InvoiceService.java

@RequiredArgsConstructor
@Service
public class InvoiceService {

    private final InvoiceRepository invoiceRepository;

    public Mono<InvoiceCountSummary> repositoryAggregation(LocalDate dateFrom, LocalDate dateTo) {
        return invoiceRepository.findInvoiceSummary(dateFrom, dateTo)
                .switchIfEmpty(Mono.defer(() -> Mono.just(new InvoiceCountSummary(0, 0, 0, 0))));
    }
}

InvoiceHandler.java

@Component
@RequiredArgsConstructor
public class InvoiceHandler {

    private final InvoiceService invoiceService;

    public Mono<ServerResponse> reactiveMongoRepositoryAggregation(ServerRequest serverRequest) {

        LocalDate dateFrom = serverRequest.queryParam("startDate").map(this::validateInputDate).orElse(null);
        LocalDate dateTo = serverRequest.queryParam("endDate").map(this::validateInputDate).orElse(null);

        datesValidation(dateFrom, dateTo);

        return invoiceService.repositoryAggregation(dateFrom, dateTo)
                .flatMap(invoiceCountSummary ->
                        ServerResponse.status(HttpStatus.OK)
                                .bodyValue(invoiceCountSummary));
    }

    private static void datesValidation(LocalDate dateFrom, LocalDate dateTo) {
        if (dateFrom == null || dateTo == null){
            throw new ValidatorException("The date parameters are not valid, please verify");
        }
    }

    private LocalDate validateInputDate(String inputDate){
        try {
            return LocalDate.parse(inputDate);
        }catch (DateTimeParseException e){
            return null;
        }
    }

}

Aggregation With ReactiveMongoTemplate

Spring Data also supports the MongoDB Aggregation Framework through ReactiveMongoTemplate.

ReactiveMongoTemplate is the primary implementation of ReactiveMongoOperations. It simplifies the use of Reactive MongoDB and helps to avoid common errors.

We can rewrite our aggregation as follows:

@RequiredArgsConstructor
@Service
public class InvoiceService {

    private final ReactiveMongoTemplate reactiveMongoTemplate;

    public Mono<InvoiceCountSummary> mongoTemplateAggregation(LocalDate dateFrom, LocalDate dateTo) {


        var matchStage = match(Criteria.where("invoiceDate").gte(dateFrom).lte(dateTo));

        var groupStage = group().sum("amount").as("total").push(Aggregation.ROOT).as("docs");

        var firstProjectStage = project("total").andExclude("_id")
                .and(aggregationOperationContext -> filterConditions(InvoiceStatusEnum.PAID.name())).as("paid")
                .and(aggregationOperationContext -> filterConditions(InvoiceStatusEnum.CANCELED.name())).as("canceled")
                .and(aggregationOperationContext -> filterConditions(InvoiceStatusEnum.PENDING.name())).as("pending");

        var secondProjectStage = project( "total")
                .and(aggregationOperationContext -> new Document("$sum", "$paid.amount")).as("paid")
                .and(aggregationOperationContext -> new Document("$sum", "$canceled.amount")).as("canceled")
                .and(aggregationOperationContext -> new Document("$sum", "$pending.amount")).as("pending");

        var aggregation = Aggregation.newAggregation(matchStage, groupStage, firstProjectStage,secondProjectStage);

        var result = reactiveMongoTemplate.aggregate(aggregation, Invoice.class, InvoiceCountSummary.class);
        return Mono.from(result)
                .switchIfEmpty(Mono.defer(() -> Mono.just(new InvoiceCountSummary(0,0,0,0))));
    }

    private Document filterConditions(String invoiceStatus) {
        Document filterExpression = new Document();
        filterExpression.put("input", "$docs");
        filterExpression.put("as", "doc");
        filterExpression.put("cond", new Document("$eq", Arrays.<Object>asList("$$doc.status", invoiceStatus)));
        return new Document("$filter", filterExpression);
    }
}

InvoiceHandler.java

@Component
@RequiredArgsConstructor
public class InvoiceHandler {

    private final InvoiceService invoiceService;


    public Mono<ServerResponse> reactiveMongoTemplateAggregation(ServerRequest serverRequest) {

        LocalDate dateFrom = serverRequest.queryParam("startDate").map(this::validateInputDate).orElse(null);
        LocalDate dateTo = serverRequest.queryParam("endDate").map(this::validateInputDate).orElse(null);

        datesValidation(dateFrom, dateTo);

        return invoiceService.mongoTemplateAggregation(dateFrom, dateTo)
                .flatMap(invoiceCountSummary ->
                        ServerResponse.status(HttpStatus.OK)
                                .bodyValue(invoiceCountSummary));
    }

    private static void datesValidation(LocalDate dateFrom, LocalDate dateTo) {
        if (dateFrom == null || dateTo == null){
            throw new ValidatorException("The date parameters are not valid, please verify");
        }
    }

    private LocalDate validateInputDate(String inputDate){
        try {
            return LocalDate.parse(inputDate);
        }catch (DateTimeParseException e){
            return null;
        }
    }
}

Testing

We are all done with our code. We can run our application and test it.

As we can see both implementations (With @Aggregation Annotation and Aggregation With ReactiveMongoTemplate) provide the same result.

Conclusion

In this post, We have seen how to use the MongoDB Aggregation Framework in Spring Reactive.

MongoDB’s Aggregation Framework can be used to do even more than we demonstrated in the example of this story. We hope that this introduction leads you along the path of further research.

The complete source code is available on GitHub.

Thanks for reading!

References

👉 Link to Medium blog

Related Posts