License

The library is open source, developed on GitHub, and is licensed under the Apache Public License 2.0

Pre-requisites

This library requires at least Java 11 (Java 21+ is recommended) and RabbitMQ 4.0 or more.

API Overview

Environment

The Environment is the entry point for applications. It exposes an API to create connections.

Environment creation
Environment environment = new AmqpEnvironmentBuilder()
    .build();

Connection

The Connection represents an AMQP 1.0 connection on top of TCP. It exposes an API to create publishers, consumers, and to manage resources (exchanges, queues, and bindings).

Connection creation
Connection connection = environment.connectionBuilder()
    .build();

Connection Settings

Connection settings (URI, credentials, etc) can be set at the environment level. They will then be re-used for each connection. It is possible to override the environment connection settings for a given connection:

Connection settings at the environment and connection levels
Environment environment = new AmqpEnvironmentBuilder()
    .connectionSettings()
    .uri("amqp://guest:guest@localhost:5672/%2f") (1)
    .environmentBuilder().build();

Connection connection = environment.connectionBuilder()
    .uri("amqp://admin:admin@localhost:5672/%2f") (2)
    .build();
1 Use the guest user by default
2 Use the admin user for this connection

Publishing

Publisher creation
Publisher publisher = connection.publisherBuilder()
    .exchange("foo").key("bar")
    .build();
Message creation
Message message = publisher
    .message("hello".getBytes(StandardCharsets.UTF_8))
    .messageId(1L);
Message publishing
publisher.publish(message, context -> {
  if (context.status() == ACCEPTED) {
    // the broker accepted (confirmed) the message
  } else {
    // deal with possible failure
  }
});
Target address format: exchange and key
Publisher publisher = connection.publisherBuilder()
    .exchange("foo").key("bar") (1)
    .build();
1 Translates to /exchanges/foo/bar
Target address format: exchange
Publisher publisher = connection.publisherBuilder()
    .exchange("foo") (1)
    .build();
1 Translates to /exchanges/foo
Target address format: queue
Publisher publisher = connection.publisherBuilder()
    .queue("some-queue") (1)
    .build();
1 Translates to /queues/some-queue
Target address format: address in to field
Publisher publisher = connection.publisherBuilder()
    .build(); (1)

Message message1 = publisher.message()
    .toAddress().exchange("foo").key("bar") (2)
    .message();

Message message2 = publisher.message()
    .toAddress().exchange("foo") (3)
    .message();

Message message3 = publisher.message()
    .toAddress().queue("my-queue") (4)
    .message();
1 No address specified, translates to null
2 Exchange and key automatically set in message to field
3 Exchange automatically set in message to field
4 Queue automatically set in message to field

Consuming

Consuming from a queue
Consumer consumer = connection.consumerBuilder()
    .queue("some-queue")
    .messageHandler((context, message) -> {
      byte[] body = message.body(); (1)
      // ... (2)
      context.accept(); (3)
    })
    .build();
1 Get message body
2 Process the message
3 Accept (acknowledge) the message

Graceful Shutdown

A consumer can accept, discard, or requeue a message. We say the consumer settles a message.

Unsettled messages are requeued when a consumer get closed. This can lead to duplicate processing of messages. Here is an example:

  • A consumer executes a database operation for a given message.

  • The consumer gets closed before it accepts (settles) the message.

  • The message is requeued.

  • Another consumer gets the message and executes the database operation again.

It is difficult to completely avoid duplicate messages, this is why processing should be idempotent. The consumer API allows nevertheless to pause the delivery of messages, get the number of unsettled messages to make sure it reaches 0 at some point, and then close the consumer. This ensures the consumer has finally quiesced and all the received messages have been processed.

Here is an example of a consumer graceful shutdown:

Closing a consumer gracefully
consumer.pause(); (1)
long unsettledMessageCount = consumer.unsettledMessageCount(); (2)
consumer.close(); (3)
1 Pause the delivery of messages
2 Ensure the number of unsettled messages reaches 0
3 Close the consumer

It is also possible to simplify the consumer shutdown by just closing it, but this is likely to requeue unsettled messages.

Closing a consumer abruptly (can lead to duplicate messages)
consumer.close(); (1)
1 Close the consumer with potential unsettled messages

Support for Streams

There is out-of-the-box support for streams in consumer configuration.

It is possible to set where to attach to when consuming from a stream:

Attaching to the beginning of a stream
Consumer consumer = connection.consumerBuilder()
    .queue("some-stream")
    .stream() (1)
      .offset(ConsumerBuilder.StreamOffsetSpecification.FIRST) (2)
    .builder() (3)
    .messageHandler((context, message) -> {
      // message processing
    })
    .build();
1 Access to stream-specific configuration helper
2 Attach to the first offset in the stream
3 Go back to main consumer configuration

There is also support for stream filtering configuration:

Configuring stream filtering
Consumer consumer = connection.consumerBuilder()
    .queue("some-stream")
    .stream() (1)
      .filterValues("invoices", "orders") (2)
      .filterMatchUnfiltered(true) (3)
    .builder() (4)
    .messageHandler((context, message) -> {
      // message processing
    })
    .build();
1 Access to stream-specific configuration helper
2 Set one or several filter values
3 Get "unfiltered" messages as well
4 Go back to main consumer configuration

Consider also using the native stream protocol with the RabbitMQ stream Java client when working with streams.

Resource Management

The Management object is the entry point to deal with resources. It is accessible from the Environment.

Getting the management object from the environment
Management management = connection.management();

Exchanges

Exchange creation (built-in type)
management.exchange()
    .name("my-exchange")
    .type(FANOUT)
    .declare();
Exchange creation (not built-in)
management.exchange()
    .name("my-exchange")
    .type("x-delayed-message")
    .autoDelete(false)
    .argument("x-delayed-type", "direct")
    .declare();
Exchange deletion
management.exchangeDeletion().delete("my-exchange");

Queues

Queue creation
management.queue()
    .name("my-queue")
    .exclusive(true)
    .autoDelete(false)
    .declare();

There is no need to remember the argument names and types thanks to explicit configuration methods:

Queue creation with arguments
management
    .queue()
    .name("my-queue")
    .messageTtl(Duration.ofMinutes(10)) (1)
    .maxLengthBytes(ByteCapacity.MB(100)) (1)
    .declare();
1 Explicit methods for common queue arguments
Quorum queue creation
management
    .queue()
    .name("my-quorum-queue")
    .quorum() (1)
      .quorumInitialGroupSize(3)
      .deliveryLimit(3)
    .queue()
    .declare();
1 Queue-type-specific configuration helper
Queue deletion
management.queueDeletion().delete("my-queue");

Bindings

Exchange-to-queue binding
management.binding()
    .sourceExchange("my-exchange")
    .destinationQueue("my-queue")
    .key("foo")
    .bind();
Exchange-to-exchange binding
management.binding()
    .sourceExchange("my-exchange")
    .destinationExchange("my-other-exchange")
    .key("foo")
    .bind();
Unbinding
management.unbind()
    .sourceExchange("my-exchange")
    .destinationQueue("my-queue")
    .key("foo")
    .unbind();

Lifecycle Listeners

It is possible to add one or several listeners when creating a Connection instance:

Setting a listener on a connection
Connection connection =  environment.connectionBuilder()
    .listeners(context -> { (1)
  context.previousState(); (2)
  context.currentState(); (3)
  context.failureCause(); (4)
  context.resource(); (5)
}).build();
1 Setting a listener
2 The previous state of the connection
3 The current (new) state of the connection
4 The cause of the failure (in case of failure)
5 The connection instance (as a Resource)

It is also possible to set listeners on publisher instances:

Setting a listener on a publisher
Publisher publisher = connection.publisherBuilder()
    .listeners(context -> { (1)
      // ...
    })
    .exchange("foo").key("bar")
    .build();
1 Setting a listener

And on consumer instances as well:

Setting a listener on a consumer
Consumer consumer = connection.consumerBuilder()
    .listeners(context -> { (1)
      // ...
    })
    .queue("my-queue")
    .build();
1 Setting a listener

The listener API is the same for connection, publishers, and consumers. They all implement the Resource marker interface. The StateListener interface has only one method that takes a Context argument. See the snippet above for the information available in the Context.

A resource State can have the following values: OPENING, OPEN, RECOVERING, CLOSING, CLOSED.

Recovery

Automatic connection recovery is activated by default: the client will automatically restore a connection after an unexpected closing (e.g. network glitch, node restart, etc). Automatic topology recovery is also activated as soon as connection recovery is: the client will recreate AMQP entities, as well as publishers and consumers for the recovering connection. Developers have less to worry about network stability and node restart, as the client will take care of it.

The client tries to reconnect every 5 seconds until it succeeds. It is possible to change this behavior by providing a custom BackOffDelayPolicy:

Setting a back-off policy for connection recovery
Connection connection = environment.connectionBuilder()
    .recovery() (1)
    .backOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2))) (2)
    .connectionBuilder().build();
1 Configure recovery
2 Set the back-off delay policy

It is also possible to deactivate topology recovery if it is not appropriate for a given application. The application would usually register a connection lifecycle listener to know when the connection is recovered and recover its own state accordingly.

Deactivating topology recovery
Connection connection = environment.connectionBuilder()
    .recovery()
    .topology(false) (1)
    .connectionBuilder()
    .listeners(context -> {
      (2)
    })
    .build();
1 Deactivate topology recovery
2 Restore application state when connection is recovered

It is also possible to deactivate recovery altogether:

Deactivating recovery
Connection connection = environment.connectionBuilder()
    .recovery()
    .activated(false) (1)
    .connectionBuilder().build();
1 Deactivate recovery

Metrics Collection

The library provides the MetricsCollector abstraction to collect metrics. A MetricsCollector instance is set at the environment level and is called in several places by the library. The underlying implementation can then collect, compute and expose metrics.

The library provides the MicrometerMetricsCollector, an implementation based on Micrometer. Micrometer is a façade for observability systems: the user can choose the Micrometer meter registry for their favorite system.

Here is how to set up the Micrometer metrics collector and configure it for Prometheus:

Setting metrics collection with Prometheus
PrometheusMeterRegistry registry = new PrometheusMeterRegistry( (1)
    PrometheusConfig.DEFAULT
);
MetricsCollector collector = new MicrometerMetricsCollector(registry); (2)

Environment environment = new AmqpEnvironmentBuilder()
    .metricsCollector(collector) (3)
    .build();

Connection connection = environment.connectionBuilder().build(); (4)
1 Create the Micrometer meter registry for Prometheus
2 Create the metrics collector with the registry
3 Set the metrics collector in the environment builder
4 Create a connection, a Micrometer gauge is incremented

The following metrics are recorded:

  • the number of open connections

  • the number of open publishers

  • the number of open consumers

  • the total number of published messages

  • the total number of accepted published messages

  • the total number of failed published messages

  • the total number of consumed messages

  • the total number of accepted consumed messages

  • the total number of requeued consumed messages

  • the total number of discarded consumed messages

Here is an example of the Prometheus output:

Example of the Prometheus output
# HELP rabbitmq_amqp_connections
# TYPE rabbitmq_amqp_connections gauge
rabbitmq_amqp_connections 1.0
# HELP rabbitmq_amqp_consumed_total
# TYPE rabbitmq_amqp_consumed_total counter
rabbitmq_amqp_consumed_total 3.0
# HELP rabbitmq_amqp_consumed_accepted_total
# TYPE rabbitmq_amqp_consumed_accepted_total counter
rabbitmq_amqp_consumed_accepted_total 1.0
# HELP rabbitmq_amqp_consumed_discarded_total
# TYPE rabbitmq_amqp_consumed_discarded_total counter
rabbitmq_amqp_consumed_discarded_total 1.0
# HELP rabbitmq_amqp_consumed_requeued_total
# TYPE rabbitmq_amqp_consumed_requeued_total counter
rabbitmq_amqp_consumed_requeued_total 1.0
# HELP rabbitmq_amqp_consumers
# TYPE rabbitmq_amqp_consumers gauge
rabbitmq_amqp_consumers 1.0
# HELP rabbitmq_amqp_published_total
# TYPE rabbitmq_amqp_published_total counter
rabbitmq_amqp_published_total 2.0
# HELP rabbitmq_amqp_published_accepted_total
# TYPE rabbitmq_amqp_published_accepted_total counter
rabbitmq_amqp_published_accepted_total 1.0
# HELP rabbitmq_amqp_published_failed_total
# TYPE rabbitmq_amqp_published_failed_total counter
rabbitmq_amqp_published_failed_total 1.0
# HELP rabbitmq_amqp_publishers
# TYPE rabbitmq_amqp_publishers gauge
rabbitmq_amqp_publishers 1.0

Remote Procedure Call (RPC)

Remote procedure call with RabbitMQ consists in a client sending a request message and a server replying with a response message. Both the RPC client and server are client applications and the messages flow through the broker. The RPC client must send a reply-to queue address with the request. The RPC server uses this reply-to queue address to send the response. There must also be a way to correlate a request with its response, this is usually handled with a header that the RPC client and server agree on.

The library provides RPC client and server support classes. They use sensible defaults and some of the internal mechanics are configurable. They should meet the requirements of most RPC use cases. It is still possible to implement one part or the other with regular publishers and consumers for special cases, as this is what the RPC support classes do.

Here is how to create an RPC server instance:

Creating an RPC server
RpcServer rpcServer = connection.rpcServerBuilder() (1)
    .requestQueue("rpc-server") (2)
    .handler((ctx, req) -> { (3)
      String in = new String(req.body(), UTF_8);
      String out = "*** " + in + " ***";
      return ctx.message(out.getBytes(UTF_8)); (4)
    }).build();
1 Use builder from connection
2 Set the queue to consume requests from (it must exist)
3 Define the processing logic
4 Create the reply message

Note the RPC server does not create the queue it waits requests on. It must be created beforehand.

Here is how to create an RPC client:

Creating an RPC client
RpcClient rpcClient = connection.rpcClientBuilder() (1)
    .requestAddress().queue("rpc-server") (2)
    .rpcClient()
    .build();
1 Use builder from connection
2 Set the address to send request messages to

The RPC client will send its request to the configured destination. It can be an exchange or a queue, like in the example above.

Here is how to send a request:

Sending a request
Message request = rpcClient.message("hello".getBytes(UTF_8)); (1)
CompletableFuture<Message> replyFuture = rpcClient.publish(request); (2)
Message reply = replyFuture.get(10, TimeUnit.SECONDS); (3)
1 Create the message request
2 Send the request
3 Wait for the reply (synchronously)

The RpcClient#publish(Message) method returns a CompletableFuture<Message> that holds the reply message. It is then possible to wait for the reply asynchronously or synchronously.

The RPC server uses the following defaults:

The RPC client uses the following defaults:

  • it creates and waits for replies on an auto-delete, exclusive queue if no reply-to queue is set.

  • it uses a string-based correlation ID generator, with a fixed random UUID prefix and a strictly monotonic increasing sequence suffix ({UUID}-{sequence}, e.g. 6f839461-6b19-47e1-80b3-6be10d899d85-42). The prefix is different for each RpcClient instance and the suffix is incremented by one for each request.

  • it sets the request reply-to property to the reply-to queue address (defined by the user or created automatically, see above).

  • it sets the request message-id property to the generated correlation ID.

  • it extracts the correlation ID from the reply correlation-id property to correlate a reply with the appropriate request.

Let’s see how to customize some of the RPC support mechanics. Imagine the request message-id property is a critical piece of information and we do not want to use it as the correlation ID. The request can use the correlation-id property and the RPC server just has to extract the correlation ID from this property (instead of the message-id property by default). Let’s also use a random UUID for the correlation ID generation (avoids doing this in production: this is OK in terms of uniqueness but not optimal in terms of performance because randomness is not cheap).

Here is how to declare the RPC client:

Customizing the RPC client
String replyToQueue = connection.management().queue()
    .autoDelete(true).exclusive(true)
    .declare().name(); (1)
RpcClient rpcClient = connection.rpcClientBuilder()
    .correlationIdSupplier(UUID::randomUUID) (2)
    .requestPostProcessor((msg, corrId) ->
        msg.correlationId(corrId) (3)
           .replyToAddress().queue(replyToQueue).message()) (4)
    .replyToQueue(replyToQueue)
    .requestAddress().queue("rpc-server") (5)
    .rpcClient()
    .build();
1 Declare the reply-to queue
2 Use a random UUID as correlation ID
3 Use the correlation-id property for the request
4 Set the reply-to property
5 Set the address to send request messages to

We just have to tell the RPC server to get the correlation ID from the request correlation-id property:

Customizing the RPC server
RpcServer rpcServer = connection.rpcServerBuilder()
    .correlationIdExtractor(Message::correlationId) (1)
    .requestQueue("rpc-server")
    .handler((ctx, req) -> {
      String in = new String(req.body(), UTF_8);
      String out = "*** " + in + " ***";
      return ctx.message(out.getBytes(UTF_8));
    }).build();
1 Get the correlation ID from the request correlation-id property

Dependencies

Use your favorite build management tool to add the client dependencies to your project.

Maven

pom.xml
<dependencies>

  <dependency>
    <groupId>com.rabbitmq.client</groupId>
    <artifactId>amqp-client</artifactId>
    <version>0.1.0-SNAPSHOT</version>
  </dependency>

</dependencies>

Snapshots require to declare the appropriate repository.

Gradle

build.gradle
dependencies {
  compile "com.rabbitmq.client:amqp-client:0.1.0-SNAPSHOT"
}

Snapshots require to declare the appropriate repository.

Snapshots

Releases are available from Maven Central, which does not require specific declaration. Snapshots are available from a repository which must be declared in the dependency management configuration.

With Maven:

Snapshot repository declaration for Maven
<repositories>

  <repository>
    <id>ossrh</id>
    <url>https://oss.sonatype.org/content/repositories/snapshots</url>
    <snapshots><enabled>true</enabled></snapshots>
    <releases><enabled>false</enabled></releases>
  </repository>

</repositories>

With Gradle:

Snapshot repository declaration for Gradle:
repositories {
  maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
  mavenCentral()
}

Appendix A: Micrometer Observation

It is possible to use Micrometer Observation to instrument publishing and consuming in the AMQP Java client. Micrometer Observation provides metrics, tracing, and log correlation with one single API.

The AMQP Java client provides an ObservationCollector abstraction and an implementation for Micrometer Observation. The following snippet shows how to create and set up the Micrometer ObservationCollector implementation with an existing ObservationRegistry:

Configuring Micrometer Observation
Environment environment = new AmqpEnvironmentBuilder()
    .observationCollector(new MicrometerObservationCollectorBuilder()  (1)
        .registry(observationRegistry).build())  (2)
    .build();
1 Configure Micrometer ObservationCollector with builder
2 Set Micrometer ObservationRegistry

The next sections document the conventions, spans, and metrics made available by the instrumentation. They are automatically generated from the source code with the Micrometer documentation generator.

Observability - Conventions

Below you can find a list of all GlobalObservationConvention and ObservationConvention declared by this project.

Table 1. ObservationConvention implementations

ObservationConvention Class Name

Applicable ObservationContext Class Name

com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention

DeliverContext

com.rabbitmq.client.amqp.observation.micrometer.DeliverObservationConvention

DeliverContext

com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention

PublishContext

com.rabbitmq.client.amqp.observation.micrometer.PublishObservationConvention

PublishContext

Observability - Spans

Below you can find a list of all spans declared by this project.

Process Observation Span

Observation for processing a message.

Span name rabbitmq.amqp.process (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention).

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 2. Tag Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).

Publish Observation Span

Observation for publishing a message.

Span name rabbitmq.amqp.publish (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention).

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 3. Tag Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).

Observability - Metrics

Below you can find a list of all metrics declared by this project.

Process Observation

Observation for processing a message.

Metric name rabbitmq.amqp.process (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention). Type timer.

Metric name rabbitmq.amqp.process.active (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention). Type long task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 4. Low cardinality Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).

Publish Observation

Observation for publishing a message.

Metric name rabbitmq.amqp.publish (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention). Type timer.

Metric name rabbitmq.amqp.publish.active (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention). Type long task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 5. Low cardinality Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).