RabbitMQ Producer and Consumer with Reactor RabbitMQ and Spring WebFlux

In this post, we will learn how to implement RabbitMQ Producer and Consumer using Spring Reactive.

· Prerequisites
· Overview
∘ What is RabbitMQ?
∘ Why RabbitMQ?
∘ RabbitMQ key concepts?
∘ RabbitMQ AMQP 0–9–1 Model in Brief
· RabbitMQ Setup with Docker
· Getting Started
∘ Reactive API for RabbitMQ
∘ RabbitMQ Producer App
∘ RabbitMQ Consumer App
· Let’s try
· Conclusion
· References

Prerequisites

This is the list of all the prerequisites:

  • Spring Boot / WebFlux 3+
  • Maven 3.+
  • Java 17
  • Your favorite IDE (IntelliJ IDEA, Eclipse, NetBeans, VS Code)
  • Docker installed
  • Postman

Overview

What is RabbitMQ?

RabbitMQ is an open-source message broker software (a message broker or queue manager) that facilitates asynchronous messaging protocols. Written in Erlang, it was originally designed for Advanced Message Queuing Protocol (AMQP) but has been updated to support other protocols, including STOMP and MQTT. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

Why RabbitMQ?

  • It’s open source ecosystem
  • It’s lightweight and easy to deploy on-premises, on cloud environments, and your local machine.
  • It supports multiple messaging protocols.
  • It offers reliability, flexible routing, and multiple exchange types.
  • It supports many common programming languages (Java, Ruby, Python, Javascript, PHP, GO, .NET, etc.)
  • It has an active community of users and developers who contribute to its development and provide support through forums and emails.
  • It provides various monitoring and management tools, such as the RabbitMQ Management plugin.

RabbitMQ key concepts?

Here are some important concepts to know before you start working with RabbitMQ.

  • Publisher (also called “Producer”): It is an application (or application instance) that publishes (produces) messages. The same application can also consume messages and thus be a consumer at the same time.
  • Consumer: An application (or application instance) consumes and acknowledges messages. The same application can also publish messages and thus be a publisher at the same time.
  • Queue: A queue in RabbitMQ is an ordered collection of messages. Messages are enqueued and dequeued (delivered to consumers) in a (FIFO (“first in, first out”) manner.
  • ExchangeAn AMQP entity where messages are sent. Exchanges take a message and route it into zero or more queues.
  • Binding: Bindings are rules that exchanges use (among other things) to route messages to queues. To instruct an exchange E to route messages to a queue Q, Q has to be bound to E. Bindings may have an optional routing key attribute used by some exchange types.
  • Stream: It’s a persistent replicated data structure that can complete the same tasks as queues: they buffer messages from producers that consumers read.
  • Channels: It’s a virtual connection inside the network connection with the broker.
  • Connection: A TCP connection between the application and the RabbitMQ broker.
  • Message: Information (binary blobs of data) that is sent from the producer to a consumer through RabbitMQ.
  • Virtual Hosts: It provides logical grouping and separation of resources.

RabbitMQ AMQP 0–9–1 Model in Brief

AMQP 0–9–1 (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.

Messaging brokers receive messages from publishers and route them to consumers.

Messages are published to exchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules (bindings). Then the broker either delivers messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand.

AMQP 0–9–1 brokers provide four exchange types:

  • Direct (Default): A direct exchange delivers messages to queues based on the message routing key. A direct exchange is ideal for the unicast routing of messages. They can be used for multicast routing as well.
  • Fanout: A fanout exchange routes messages to all of the queues that are bound to it and the routing key is ignored. If N queues are bound to a fanout exchange, when a new message is published to that exchange a copy of the message is delivered to all N queues. Fanout exchanges are ideal for the broadcast routing of messages.
  • Topic: Topic exchanges route messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange. The topic exchange type is often used to implement various publish/subscribe pattern variations. Topic exchanges are commonly used for the multicast routing of messages.
  • Headers: A headers exchange is designed for routing on multiple attributes that are more easily expressed as message headers than a routing key. Headers exchanges ignore the routing key attribute. Instead, the attributes used for routing are taken from the headers attribute.

RabbitMQ Setup with Docker

There are many ways to install RabbitMQ (Linux, Windows, MacOS, Docker, Kubernetes, Cloud mode, etc.). For this demo, We’ll use Docker containers.

# latest RabbitMQ 3.13
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management

Open the URL http://localhost:15672/

By default, the credentials are guest/guest

Getting Started

We will create 2 Spring projects (reactive-rabbitmq-producer and reactive-rabbitmq-consumer) from start.spring.io, with the following dependencies: Spring Webflux, Spring for RabbitMQ, Lombok, and Validation.

Reactive API for RabbitMQ

Reactor RabbitMQ is a reactive API for RabbitMQ based on Reactor and RabbitMQ Java Client. Reactor RabbitMQ API enables messages to be published to RabbitMQ and consumed from RabbitMQ using functional APIs with non-blocking back-pressure and very low overheads. This enables applications using Reactor to use RabbitMQ as a message bus or streaming platform and integrate with other systems to provide an end-to-end reactive pipeline.

We need to include the dependency of Reactor RabbitMQ in the projects.

<dependency>
<groupId>io.projectreactor.rabbitmq</groupId>
<artifactId>reactor-rabbitmq</artifactId>
<version>1.5.6</version>
</dependency>

The latest version of the Pulsar Java client library is available in the Maven central repository.

Then add the RabbitMQ properties in the application.properties/yaml file

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

RabbitMQ Producer App

Let’s start with the configuration class

@Configuration
public class RabbitMQConfig {


@Bean // (1)
public Mono<Connection> connectionMono(RabbitProperties rabbitProperties) {
var connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
return Mono.fromCallable(() -> connectionFactory.newConnection("reactor-rabbit")).cache();
}

@Bean // (2)
public SenderOptions senderOptions(Mono<Connection> connectionMono) {
return new SenderOptions()
.connectionMono(connectionMono)
.resourceManagementScheduler(Schedulers.boundedElastic());
}

@Bean // (3)
public Sender sender(SenderOptions senderOptions) {
return RabbitFlux.createSender(senderOptions);
}

@Bean // (4)
public ObjectMapper objectMapper() {
var objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
return objectMapper;
}

}
  1. RabbitMQ Connection bean that is used to transport messages to the broker. Create and configure connection factory
  2. The bean of SenderOptions contains the ConnectionFactory that creates connections to the broker and a Reactor Scheduler used by the Sender.
  3. The Sender bean is created with the options already configured in senderOptions.
  4. The bean ObjectMapper is used to parse the input object into a string.

The Sender is also able to declare and delete AMQP resources reactively. Sender has a declare* method for each type of resource (exchange, binding, and queue) and there’s also a respective *Specification class to describe each creation.

@RequiredArgsConstructor
@Configuration
public class RabbitMQInit {


private Mono<Connection> connectionMono;

private final AmqpAdmin amqpAdmin;

/**
* Method create exchanges, bindings, queues on start.
*/
@PostConstruct
public void init() {

/* create exchanges */
amqpAdmin.declareExchange(ExchangeBuilder.directExchange(AppConstant.EXCHANGE).build());

/* create queue */
amqpAdmin.declareQueue(new Queue(AppConstant.QUEUE, false, false, false));


/* bind queues to exchanges */
amqpAdmin.declareBinding(BindingBuilder
.bind(new Queue(AppConstant.QUEUE))
.to(new DirectExchange(AppConstant.EXCHANGE))
.with(AppConstant.ROUTING_KEY)
);
}


@PreDestroy
public void close() {
Objects.requireNonNull(connectionMono.block()).close();
}
}

In this case, we created Direct exchange, queue, and binding with a routing key. When the Sender is no longer required, the instance can be closed. The underlying Connection is closed, as well as the default schedulers if none has been explicitly provided.

The Sender is now ready to send messages to RabbitMQ. At this point, a Sender instance has been created, but no connections to RabbitMQ have been made yet. The underlying Connection instance is created lazily when a first call is made to create a resource or to send messages.

Let’s now create a sequence of messages to send to RabbitMQ. Each outbound message to be sent to RabbitMQ is represented as a OutboundMessage. An OutboundMessage contains routing information (exchange to send to and routing key) as well as the message itself (properties and body).

Flux<OutboundMessage> of messages is created for sending to RabbitMQ.

@Slf4j
@RequiredArgsConstructor
@Service
public class InventoryService {

private final Sender sender;

private final ObjectMapper mapper;

/**
* Method send message to queue with publish confirms
*/
public Mono<Void> createInventory(StockInventory stockInventory) throws JsonProcessingException {
String json = mapper.writeValueAsString(stockInventory);
byte[] inventoryByteArray = SerializationUtils.serialize(json);
Flux<OutboundMessage> outboundFlux = Flux.just(new OutboundMessage("", AppConstant.QUEUE, inventoryByteArray));


log.info("Publish message: {}", stockInventory.toString());
return sender.sendWithPublishConfirms(outboundFlux)
.subscribeOn(Schedulers.boundedElastic())
.filter(outboundMessageResult -> !outboundMessageResult.isAck())
.handle((result, sink) -> sink.error(new Exception("Publish was not acked")))
.retryWhen(Retry.backoff(2, Duration.ofMillis(100)))
.then();
}

}

Sender also offers the sendWithPublishConfirms method to send messages and receive publisher confirms to make sure the broker has taken into account the outbound messages.

InventoryController.java

@RestController
@RequiredArgsConstructor
@RequestMapping("/inventory")
public class InventoryController {

private final InventoryService inventoryService;

@PostMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public Mono<Void> createInventory(@RequestBody StockInventory stockInventory) throws JsonProcessingException {
return inventoryService.createInventory(stockInventory);
}
}

RabbitMQ Consumer App

A Consumer configuration class

@Configuration
public class RabbitMQConfig {


@Bean
public Mono<Connection> connectionMono(RabbitProperties rabbitProperties) {
var connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
return Mono.fromCallable(() -> connectionFactory.newConnection("reactor-rabbit")).cache();
}

@Bean
public ReceiverOptions receiverOptions(Mono<Connection> connectionMono) {
return new ReceiverOptions()
.connectionMono(connectionMono);
}

@Bean
public Receiver receiver(ReceiverOptions receiverOptions) {
return RabbitFlux.createReceiver(receiverOptions);
}

@Bean
public ObjectMapper objectMapper() {
var objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
return objectMapper;
}

}

It’s almost the same content as the producer config class but the messages stored in RabbitMQ queues are consumed using the reactive receiver reactor.rabbitmq.Receiver. Each instance of Receiver is associated with a single instance of Connection created by the options-provided ConnectionFactory.

A receiver is created with an instance of receiver configuration options reactor.rabbitmq.ReceiverOptions. The properties of ReceiverOptions contains the ConnectionFactory that creates connections to the broker and a Reactor Scheduler used for the connection creation.

@Slf4j
@RequiredArgsConstructor
@Component
public class ConsumerListener {

private final Mono<Connection> connectionMono;

private final InventoryService inventoryService;

private final Receiver receiver;

@PreDestroy
public void close() throws IOException {
Objects.requireNonNull(connectionMono.block()).close();
}

@PostConstruct
public Disposable receiveMessage() {
return receiver.consumeManualAck(AppConstant.QUEUE, consumeOptions())
.publishOn(Schedulers.parallel())
.filter(delivery -> Objects.nonNull(delivery.getBody()))
.flatMap(this::consumer)
.subscribe();
}

private Mono<?> consumer(AcknowledgableDelivery acknowledgableDelivery) {
return inventoryService.saveInventory(acknowledgableDelivery)
.doOnSuccess(consume -> {
if (Boolean.TRUE.equals(consume)) {
acknowledgableDelivery.ack();
} else {
acknowledgableDelivery.nack(false);
}
})
.onErrorResume(throwable -> {
log.error(">> Exception Error => ", throwable);
return Mono.fromRunnable(() -> acknowledgableDelivery.nack(false));
});
}

private ConsumeOptions consumeOptions() {
return new ConsumeOptions().exceptionHandler(
new ExceptionHandlers.RetryAcknowledgmentExceptionHandler(
Duration.ofSeconds(20),
Duration.ofMillis(500),
ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
)
);
}
}

When using Receiver#consumeManualAck, acknowledgment is handled by the developer, who can do pretty much anything they want on acknowledgment failure.

AcknowledgableDelivery#ack and AcknowledgableDelivery#nack methods handle retry internally based on BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler in the ConsumeOptions.

Let’s try

Run the producer and consumer applications.

We use the Postman Runner collection to make iteration calls to the producer with random payloads.

  • Producer:
-:26:13.431+02:00  INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService    : Publish message: StockInventory(id=354ecfda-9d62-4e65-8ad9-2549dd0a8b83, number=9d3055ad-1, qty=64.0)
-:26:13.466+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=118df477-81ba-4417-bbf1-c2c1be576148, number=c6273e41-9, qty=160.0)
-:26:13.491+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=a60f97b0-ed5e-4411-81d6-276a6a0d6258, number=3e622cfb-7, qty=128.0)
-:26:13.515+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=12df1149-052a-4337-b3fa-cec9aab7cfcb, number=ed6af5c3-2, qty=156.0)
-:26:13.539+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=87c43c9f-c5b8-4a26-89c2-4aa0a9edd9f4, number=a4d59594-9, qty=201.0)
-:26:13.563+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=67768182-432a-46b8-a28d-64994c87ca2e, number=66fa7144-4, qty=155.0)
-:26:13.585+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=60ea1e96-eb4d-42c8-89f3-a2174538509e, number=cc41046e-6, qty=219.0)
-:26:13.610+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=6a2085f5-bee7-4f5f-a874-1e2f2bc319ee, number=8e84d474-7, qty=137.0)
-:26:13.637+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=06960ede-0337-4610-a460-6d3604fdadc1, number=c3fd35e0-c, qty=136.0)
-:26:13.665+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=a7c30704-d1fe-4ae9-a5d3-f77bdf860481, number=01446e1f-5, qty=178.0)
-:26:13.689+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=b207a1d4-e18f-46f2-9fa4-410f9c9568c9, number=ba7a903b-1, qty=86.0)
-:26:13.713+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=d9d703cc-9e7d-4071-b9e5-c918225af624, number=efbb506f-f, qty=190.0)
-:26:13.750+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=4af63698-f754-4c8c-a82a-203b65d33694, number=ff6b8b53-0, qty=215.0)
-:26:13.793+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=cf02647a-e2a2-4d5b-9b47-c58e680b8bca, number=d0795193-0, qty=12.0)
-:26:13.854+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=22ccdb31-dd2c-4504-9954-b14996af00b0, number=ce7d8ff2-6, qty=240.0)
-:26:13.902+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=afc85710-7462-42f6-931c-46c30d3da9e2, number=0169c682-d, qty=174.0)
-:26:13.945+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=e8dd7ad8-e4a9-442f-b1d6-91487bfd8bd7, number=c700cbec-b, qty=73.0)
-:26:13.993+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=ee7c2ec4-670f-4b03-8676-c27dd79efa8d, number=f2e2a563-8, qty=10.0)
-:26:14.030+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=73f8c860-97ae-44df-a146-d6093252ead5, number=9f934f0b-6, qty=22.0)
-:26:14.059+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=87f352a5-e2d4-40d9-b4e0-c96f9e890c11, number=cfd9bf1b-e, qty=64.0)
-:26:14.102+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=7cb434b2-35f1-44fe-ac01-1f6a5242a696, number=b56ce501-c, qty=80.0)
-:26:14.139+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=8278052d-66ee-4f0b-868c-5487729a58c9, number=0cff69dc-f, qty=40.0)
-:26:14.167+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=2242336e-1d62-4863-a018-5450f1005834, number=0e67e9a5-d, qty=167.0)
-:26:14.190+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=5ef0e664-2372-40aa-859c-221ec9d488df, number=61d04026-f, qty=28.0)
-:26:14.212+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=83e8fbde-2e5d-4ac8-862b-73b21d67e193, number=3a760844-c, qty=203.0)
-:26:14.232+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=5171fd2e-8436-430c-a9a7-12b147ba1495, number=69cbe62a-6, qty=48.0)
-:26:14.258+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=ea30b9f8-dbbd-4062-b46f-dfb59b1f815a, number=9a7d3d8d-2, qty=80.0)
-:26:14.282+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=3e5c8b9e-0699-4aea-b6cd-cd0ebb6cca8a, number=ecc4b010-8, qty=74.0)
-:26:14.304+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=46ae2c0b-e899-432c-81c8-70a89c06afdb, number=a5ebd213-c, qty=170.0)
-:26:14.331+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=34fe0338-0bd7-4fe9-9896-a784a66b8011, number=3babbedc-7, qty=246.0)
-:26:14.359+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=a8606c0a-c41e-4979-a7df-171c2e27a61d, number=a93d0945-a, qty=25.0)
-:26:14.388+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=aa1122f1-27e5-4224-accb-e8d490f07db5, number=572eb79a-1, qty=185.0)
-:26:14.419+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=59e9fabf-71a7-4c36-82f2-a30ab05db7f1, number=04eeadbb-e, qty=117.0)
-:26:14.446+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=38a44870-22c7-4a0a-a96d-c7a08daecdef, number=937c206d-3, qty=73.0)
-:26:14.470+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=22f7f6ae-a20c-462d-95b7-040109ff105e, number=bfa8ba99-3, qty=103.0)
-:26:14.497+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=aea09f0b-4f9a-44f2-b877-6b5f2fa0b1fa, number=2b2662f8-c, qty=95.0)
-:26:14.521+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=c87ea80d-836a-4b9e-9675-2877b5fb8a07, number=c3fdf71b-c, qty=228.0)
-:26:14.543+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=f22b654b-be9e-4fd9-ba82-1330a1302a7b, number=b68cb213-5, qty=36.0)
-:26:14.576+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=f9776c3b-a3d6-4b32-a317-462a8ee29853, number=42047373-b, qty=173.0)
-:26:14.610+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=47594452-ca70-4585-8155-51154e52074b, number=64167891-c, qty=129.0)
-:26:14.643+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=0f930895-2ff0-4d05-932a-18390a006daf, number=09ac8874-4, qty=109.0)
-:26:14.669+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=84830a2f-65ab-4cfd-a120-a5a0f5072982, number=54ba181c-c, qty=209.0)
-:26:14.691+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=446841fc-b516-487c-a893-7e827e1c0fe1, number=cf580ead-e, qty=77.0)
-:26:14.720+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=25e8c20b-04ef-418d-8dc0-646b23b1fd2a, number=692ffbe5-6, qty=130.0)
-:26:14.750+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=dc1fa61e-879b-4959-968f-a69e259bb5d0, number=9848fd40-8, qty=248.0)
-:26:14.782+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=7c1cb915-a170-4d99-abdf-6249ecd87489, number=ae7d3910-2, qty=97.0)
-:26:14.816+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=35a21e4d-bd3c-475b-b260-8251672def62, number=ec4b2cfa-9, qty=36.0)
-:26:14.844+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=0e699d20-603b-4835-b600-51935102c82d, number=318d020a-b, qty=183.0)
-:26:14.869+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=ea28fbc1-5986-45be-b36c-24f0f8e4aa7c, number=6dc6623a-8, qty=199.0)
-:26:14.903+02:00 INFO 68008 --- [or-http-epoll-3] c.b.producer.service.InventoryService : Publish message: StockInventory(id=0217af7c-b247-4fe2-8538-1e218df3a62a, number=6d8b2546-1, qty=88.0)
  • Consumer
-:26:13.438+02:00  INFO 72502 --- [     parallel-1] c.b.consumer.service.InventoryService    : ######################################## Consumer DATA ####################################
-:26:13.438+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=354ecfda-9d62-4e65-8ad9-2549dd0a8b83, number=9d3055ad-1, qty=64.0)
-:26:13.471+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.472+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=118df477-81ba-4417-bbf1-c2c1be576148, number=c6273e41-9, qty=160.0)
-:26:13.497+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.498+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=a60f97b0-ed5e-4411-81d6-276a6a0d6258, number=3e622cfb-7, qty=128.0)
-:26:13.520+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.521+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=12df1149-052a-4337-b3fa-cec9aab7cfcb, number=ed6af5c3-2, qty=156.0)
-:26:13.544+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.544+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=87c43c9f-c5b8-4a26-89c2-4aa0a9edd9f4, number=a4d59594-9, qty=201.0)
-:26:13.567+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.568+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=67768182-432a-46b8-a28d-64994c87ca2e, number=66fa7144-4, qty=155.0)
-:26:13.589+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.590+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=60ea1e96-eb4d-42c8-89f3-a2174538509e, number=cc41046e-6, qty=219.0)
-:26:13.615+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.615+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=6a2085f5-bee7-4f5f-a874-1e2f2bc319ee, number=8e84d474-7, qty=137.0)
-:26:13.642+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.643+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=06960ede-0337-4610-a460-6d3604fdadc1, number=c3fd35e0-c, qty=136.0)
-:26:13.669+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.669+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=a7c30704-d1fe-4ae9-a5d3-f77bdf860481, number=01446e1f-5, qty=178.0)
-:26:13.693+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.693+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=b207a1d4-e18f-46f2-9fa4-410f9c9568c9, number=ba7a903b-1, qty=86.0)
-:26:13.729+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.730+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=d9d703cc-9e7d-4071-b9e5-c918225af624, number=efbb506f-f, qty=190.0)
-:26:13.754+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.755+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=4af63698-f754-4c8c-a82a-203b65d33694, number=ff6b8b53-0, qty=215.0)
-:26:13.799+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.799+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=cf02647a-e2a2-4d5b-9b47-c58e680b8bca, number=d0795193-0, qty=12.0)
-:26:13.864+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.865+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=22ccdb31-dd2c-4504-9954-b14996af00b0, number=ce7d8ff2-6, qty=240.0)
-:26:13.907+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.907+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=afc85710-7462-42f6-931c-46c30d3da9e2, number=0169c682-d, qty=174.0)
-:26:13.951+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.952+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=e8dd7ad8-e4a9-442f-b1d6-91487bfd8bd7, number=c700cbec-b, qty=73.0)
-:26:13.998+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:13.998+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=ee7c2ec4-670f-4b03-8676-c27dd79efa8d, number=f2e2a563-8, qty=10.0)
-:26:14.035+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.035+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=73f8c860-97ae-44df-a146-d6093252ead5, number=9f934f0b-6, qty=22.0)
-:26:14.064+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.065+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=87f352a5-e2d4-40d9-b4e0-c96f9e890c11, number=cfd9bf1b-e, qty=64.0)
-:26:14.112+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.112+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=7cb434b2-35f1-44fe-ac01-1f6a5242a696, number=b56ce501-c, qty=80.0)
-:26:14.143+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.144+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=8278052d-66ee-4f0b-868c-5487729a58c9, number=0cff69dc-f, qty=40.0)
-:26:14.171+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.171+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=2242336e-1d62-4863-a018-5450f1005834, number=0e67e9a5-d, qty=167.0)
-:26:14.193+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.194+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=5ef0e664-2372-40aa-859c-221ec9d488df, number=61d04026-f, qty=28.0)
-:26:14.216+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.216+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=83e8fbde-2e5d-4ac8-862b-73b21d67e193, number=3a760844-c, qty=203.0)
-:26:14.236+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.237+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=5171fd2e-8436-430c-a9a7-12b147ba1495, number=69cbe62a-6, qty=48.0)
-:26:14.261+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.262+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=ea30b9f8-dbbd-4062-b46f-dfb59b1f815a, number=9a7d3d8d-2, qty=80.0)
-:26:14.286+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.287+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=3e5c8b9e-0699-4aea-b6cd-cd0ebb6cca8a, number=ecc4b010-8, qty=74.0)
-:26:14.308+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.309+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=46ae2c0b-e899-432c-81c8-70a89c06afdb, number=a5ebd213-c, qty=170.0)
-:26:14.337+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.337+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=34fe0338-0bd7-4fe9-9896-a784a66b8011, number=3babbedc-7, qty=246.0)
-:26:14.364+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.365+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=a8606c0a-c41e-4979-a7df-171c2e27a61d, number=a93d0945-a, qty=25.0)
-:26:14.394+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.395+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=aa1122f1-27e5-4224-accb-e8d490f07db5, number=572eb79a-1, qty=185.0)
-:26:14.425+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.426+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=59e9fabf-71a7-4c36-82f2-a30ab05db7f1, number=04eeadbb-e, qty=117.0)
-:26:14.449+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.449+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=38a44870-22c7-4a0a-a96d-c7a08daecdef, number=937c206d-3, qty=73.0)
-:26:14.474+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.475+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=22f7f6ae-a20c-462d-95b7-040109ff105e, number=bfa8ba99-3, qty=103.0)
-:26:14.500+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.501+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=aea09f0b-4f9a-44f2-b877-6b5f2fa0b1fa, number=2b2662f8-c, qty=95.0)
-:26:14.524+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.524+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=c87ea80d-836a-4b9e-9675-2877b5fb8a07, number=c3fdf71b-c, qty=228.0)
-:26:14.549+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.549+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=f22b654b-be9e-4fd9-ba82-1330a1302a7b, number=b68cb213-5, qty=36.0)
-:26:14.582+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.583+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=f9776c3b-a3d6-4b32-a317-462a8ee29853, number=42047373-b, qty=173.0)
-:26:14.617+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.618+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=47594452-ca70-4585-8155-51154e52074b, number=64167891-c, qty=129.0)
-:26:14.648+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.649+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=0f930895-2ff0-4d05-932a-18390a006daf, number=09ac8874-4, qty=109.0)
-:26:14.673+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.673+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=84830a2f-65ab-4cfd-a120-a5a0f5072982, number=54ba181c-c, qty=209.0)
-:26:14.694+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.695+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=446841fc-b516-487c-a893-7e827e1c0fe1, number=cf580ead-e, qty=77.0)
-:26:14.725+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.725+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=25e8c20b-04ef-418d-8dc0-646b23b1fd2a, number=692ffbe5-6, qty=130.0)
-:26:14.756+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.757+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=dc1fa61e-879b-4959-968f-a69e259bb5d0, number=9848fd40-8, qty=248.0)
-:26:14.787+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.788+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=7c1cb915-a170-4d99-abdf-6249ecd87489, number=ae7d3910-2, qty=97.0)
-:26:14.820+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.821+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=35a21e4d-bd3c-475b-b260-8251672def62, number=ec4b2cfa-9, qty=36.0)
-:26:14.848+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.849+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=0e699d20-603b-4835-b600-51935102c82d, number=318d020a-b, qty=183.0)
-:26:14.883+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.883+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=ea28fbc1-5986-45be-b36c-24f0f8e4aa7c, number=6dc6623a-8, qty=199.0)
-:26:14.907+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : ######################################## Consumer DATA ####################################
-:26:14.908+02:00 INFO 72502 --- [ parallel-1] c.b.consumer.service.InventoryService : >> Inventory data: StockInventory(id=0217af7c-b247-4fe2-8538-1e218df3a62a, number=6d8b2546-1, qty=88.0)

Conclusion

Well done !!. In this post, we explored implementing RabbitMQ Producer and Consumer using Spring Reactive.

The complete source code is available on GitHub.

If you enjoyed this story, please give it a few claps for support.

Thanks for reading!

References

👉 Link to Medium blog

Related Posts