The RabbitMQ AMQP 1.0 Java Client is a library to communicate with RabbitMQ using AMQP 1.0.

It is for RabbitMQ only, it does not support other brokers.

Pre-requisites

This library requires at least Java 11 (Java 21+ is recommended) and RabbitMQ 4.0 or more.

License

The library is open source, developed on GitHub, and is licensed under the Apache Public License 2.0

Dependencies

Use your favorite build management tool to add the client dependencies to your project.

Maven

pom.xml
<dependencies>

  <dependency>
    <groupId>com.rabbitmq.client</groupId>
    <artifactId>amqp-client</artifactId>
    <version>0.8.0-SNAPSHOT</version>
  </dependency>

</dependencies>

Snapshots require to declare the appropriate repository.

Gradle

build.gradle
dependencies {
  compile "com.rabbitmq.client:amqp-client:0.8.0-SNAPSHOT"
}

Snapshots require to declare the appropriate repository.

Snapshots

Releases are available from Maven Central, which does not require specific declaration. Snapshots are available from a repository which must be declared in the dependency management configuration.

With Maven:

Snapshot repository declaration for Maven
<repositories>

  <repository>
    <id>central-portal-snapshots</id>
    <url>https://central.sonatype.com/repository/maven-snapshots/</url>
    <snapshots><enabled>true</enabled></snapshots>
    <releases><enabled>false</enabled></releases>
  </repository>

</repositories>

With Gradle:

Snapshot repository declaration for Gradle:
repositories {
  maven {
    name = 'Central Portal Snapshots'
    url = 'https://central.sonatype.com/repository/maven-snapshots/'
    // Only search this repository for the specific dependency
    content {
      includeModule("com.rabbitmq.client", "amqp-client")
    }
  }
  mavenCentral()
}

Versioning

The RabbitMQ AMQP 1.0 Java Client is in development and stabilization phase. When the stabilization phase ends, a 1.0.0 version will be cut, and semantic versioning is likely to be enforced.

Before reaching the stable phase, the client will use a versioning scheme of [0.MINOR.PATCH] where:

  • 0 indicates the project is still in a stabilization phase.

  • MINOR is a 0-based number incrementing with each new release cycle. It generally reflects significant changes like new features and potentially some programming interfaces changes.

  • PATCH is a 0-based number incrementing with each service release, that is bux fixes.

Breaking changes between releases can happen but will be kept to a minimum. The next section provides more details about the evolution of programming interfaces.

Stability of Programming Interfaces

The RabbitMQ AMQP 1.0 Java Client is in active development but its programming interfaces will remain as stable as possible. There is no guarantee though that they will remain completely stable, at least until it reaches version 1.0.0.

The client contains 2 sets of programming interfaces whose stability are of interest for application developers:

  • Application Programming Interfaces (API): those are the ones used to write application logic. They include the interfaces and classes in the com.rabbitmq.client.amqp package (e.g. Connection, Publisher, Consumer, Message). These API constitute the main programming model of the client and will be kept as stable as possible.

  • Service Provider Interfaces (SPI): those are interfaces to implement mainly technical behavior in the client. They are not meant to be used to implement application logic. Application developers may have to refer to them in the configuration phase and if they want to customize some internal behavior of the client. SPI packages and interfaces are marked as such in their Javadoc. These SPI are susceptible to change, but this should have no impact on most applications, as the changes are likely to be limited to the client internals.

Overview

The RabbitMQ team maintains a set of AMQP 1.0 client libraries designed and optimized for RabbitMQ. They offer a simple and safe, yet powerful API on top of AMQP 1.0. Applications can publish and consume messages with these libraries, as well as manage the server topology in a consistent way across programming languages. The libraries also offer advanced features like automatic connection and topology recovery, and connection affinity with queues.

The RabbitMQ AMQP 1.0 Java Client is one of these libraries. This documentation covers only the advanced usage of the library; a common page on the RabbitMQ website covers the most common use cases.

Advanced Usage

Connection Settings at the Environment Level

Connection settings (URI, credentials, etc.) can be set at the environment level. They will then be re-used for each connection. It is possible to override the environment connection settings for a given connection:

Connection settings at the environment and connection levels
Environment environment = new AmqpEnvironmentBuilder()
    .connectionSettings()
    .uri("amqp://guest:guest@localhost:5672/%2f") (1)
    .environmentBuilder().build();

Connection connection = environment.connectionBuilder()
    .uri("amqp://admin:admin@localhost:5672/%2f") (2)
    .build();
1 Use the guest user by default
2 Use the admin user for this connection

Settling Messages in Batch

Settling messages in batch
int batchSize = 10;
Consumer.MessageHandler handler = new Consumer.MessageHandler() {
  volatile Consumer.BatchContext batch = null;  (1)
  @Override
  public void handle(Consumer.Context context, Message message) {
    if (batch == null) {
      batch = context.batch(batchSize);  (2)
    }
    boolean success = process(message);
    if (success) {
      batch.add(context);  (3)
      if (batch.size() == batchSize) {
        batch.accept();  (4)
        batch = null;  (5)
      }
    } else {
      context.discard();  (6)
    }
  }
};
Consumer consumer = connection.consumerBuilder()
    .queue("some-queue")
    .messageHandler(handler)
    .build();
1 Declare batch context property
2 Create a new batch context instance
3 Add the current message context to the batch context if processing is successful
4 Settle the batch context once it contains 10 messages
5 Reset the batch context
6 Discard the current message context if processing fails

Subscription Listener

The client provides a SubscriptionListener interface callback to add behavior before a subscription is created. This callback is meant for stream consumers: it can be used to dynamically set the offset the consumer attaches to in the stream. It is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection).

It is possible to use the callback to get the last processed offset from an external store. The following code snippet shows how this can be done (note the interaction with the external store is not detailed):

Using the subscription listener to attach to a stream
connection.consumerBuilder()
    .queue("some-stream")
    .subscriptionListener(ctx -> {  (1)
      long offset = getOffsetFromExternalStore();  (2)
      ctx.streamOptions().offset(offset + 1);  (3)
    })
    .messageHandler((ctx, msg) -> {
      // message handling code...

      long offset = (long) msg.annotation("x-stream-offset");  (4)
      storeOffsetInExternalStore(offset);  (5)
    })
    .build();
1 Set subscription listener
2 Get offset from external store
3 Set offset to use for the subscription
4 Get the message offset
5 Store the offset in the external store after processing

OAuth 2 Support

The client can authenticate against an OAuth 2 server like UAA. It uses the OAuth 2 Client Credentials flow. The OAuth 2 plugin must be enabled on the server side and configured to use the same OAuth 2 server as the client.

How to retrieve the OAuth 2 token can be globally configured at the environment level:

Configuring OAuth 2 token retrieval
Environment environment = new AmqpEnvironmentBuilder()
    .connectionSettings().oauth2()  (1)
    .tokenEndpointUri("https://localhost:8443/uaa/oauth/token/")  (2)
    .clientId("rabbitmq").clientSecret("rabbitmq")  (3)
    .grantType("password")  (4)
    .parameter("username", "rabbit_super")  (5)
    .parameter("password", "rabbit_super")  (5)
    .tls().sslContext(sslContext).oauth2()  (6)
    .shared(true)  (7)
    .connection()
    .environmentBuilder().build();
1 Access the OAuth 2 configuration
2 Set the token endpoint URI
3 Authenticate the client application
4 Set the grant type
5 Set optional parameters (depends on the OAuth 2 server)
6 Set the SSL context (e.g. to verify and trust the identity of the OAuth 2 server)
7 The token can be shared across the environment connections

The environment retrieves tokens and uses them to create AMQP connections. It also takes care of refreshing the tokens before they expire and of re-authenticating existing connections so the broker does not close them when their token expires.

The environment uses the same token for all the connections it maintains by default, but this can be changed by setting the shared flag to false. With shared = false, each connection will have its own OAuth 2 token.

The OAuth 2 configuration can be set at the environment level but also at the connection level.

Metrics Collection

The library provides the MetricsCollector abstraction to collect metrics. A MetricsCollector instance is set at the environment level and is called in several places by the library. The underlying implementation can then collect, compute and expose metrics.

The library provides the MicrometerMetricsCollector, an implementation based on Micrometer. Micrometer is a façade for observability systems: the user can choose the Micrometer meter registry for their favorite system.

Here is how to set up the Micrometer metrics collector and configure it for Prometheus:

Setting metrics collection with Prometheus
PrometheusMeterRegistry registry = new PrometheusMeterRegistry( (1)
    PrometheusConfig.DEFAULT
);
MetricsCollector collector = new MicrometerMetricsCollector(registry); (2)

Environment environment = new AmqpEnvironmentBuilder()
    .metricsCollector(collector) (3)
    .build();

Connection connection = environment.connectionBuilder().build(); (4)
1 Create the Micrometer meter registry for Prometheus
2 Create the metrics collector with the registry
3 Set the metrics collector in the environment builder
4 Create a connection, a Micrometer gauge is incremented

The following metrics are recorded:

  • the number of open connections

  • the number of open publishers

  • the number of open consumers

  • the total number of published messages

  • the total number of accepted published messages

  • the total number of rejected published messages

  • the total number of released published messages

  • the total number of consumed messages

  • the total number of accepted consumed messages

  • the total number of requeued consumed messages

  • the total number of discarded consumed messages

Here is an example of the Prometheus output:

Example of the Prometheus output
# HELP rabbitmq_amqp_connections
# TYPE rabbitmq_amqp_connections gauge
rabbitmq_amqp_connections 1.0
# HELP rabbitmq_amqp_consumed_total
# TYPE rabbitmq_amqp_consumed_total counter
rabbitmq_amqp_consumed_total 3.0
# HELP rabbitmq_amqp_consumed_accepted_total
# TYPE rabbitmq_amqp_consumed_accepted_total counter
rabbitmq_amqp_consumed_accepted_total 1.0
# HELP rabbitmq_amqp_consumed_discarded_total
# TYPE rabbitmq_amqp_consumed_discarded_total counter
rabbitmq_amqp_consumed_discarded_total 1.0
# HELP rabbitmq_amqp_consumed_requeued_total
# TYPE rabbitmq_amqp_consumed_requeued_total counter
rabbitmq_amqp_consumed_requeued_total 1.0
# HELP rabbitmq_amqp_consumers
# TYPE rabbitmq_amqp_consumers gauge
rabbitmq_amqp_consumers 1.0
# HELP rabbitmq_amqp_published_total
# TYPE rabbitmq_amqp_published_total counter
rabbitmq_amqp_published_total 2.0
# HELP rabbitmq_amqp_published_accepted_total
# TYPE rabbitmq_amqp_published_accepted_total counter
rabbitmq_amqp_published_accepted_total 1.0
# HELP rabbitmq_amqp_published_rejected_total
# TYPE rabbitmq_amqp_published_rejected_total counter
rabbitmq_amqp_published_rejected_total 1.0
# HELP rabbitmq_amqp_published_released_total
# TYPE rabbitmq_amqp_published_released_total counter
rabbitmq_amqp_published_released_total 1.0
# HELP rabbitmq_amqp_publishers
# TYPE rabbitmq_amqp_publishers gauge
rabbitmq_amqp_publishers 1.0

Request/Response

Request/response with RabbitMQ consists of a requester sending a request message and a responder replying with a response message. Both the requester and responder are client applications and the messages flow through the broker. The requester must send a reply-to queue address with the request. The responder uses this reply-to queue address to send the response. There must also be a way to correlate a request with its response, this is usually handled with a header that the requester and responder agree on.

The library provides requester and responder support classes. They use sensible defaults and some of the internal mechanics are configurable. They should meet the requirements of most request/response use cases. It is still possible to implement one part or the other with regular publishers and consumers for special cases, as this is what the request/response support classes do.

Here is how to create a responder instance:

Creating a responder
Responder responder = connection.responderBuilder() (1)
    .requestQueue("request-queue") (2)
    .handler((ctx, req) -> { (3)
        String in = new String(req.body(), UTF_8);
        String out = "*** " + in + " ***";
        return ctx.message(out.getBytes(UTF_8)); (4)
    }).build();
1 Use builder from connection
2 Set the queue to consume requests from (it must exist)
3 Define the processing logic
4 Create the reply message

Note that the responder does not create the queue it waits for requests on. It must be created beforehand.

Here is how to create a requester:

Creating a requester
Requester requester = connection.requesterBuilder() (1)
    .requestAddress().queue("request-queue") (2)
    .requester()
    .build();
1 Use builder from connection
2 Set the address to send request messages to

The requester will send its request to the configured destination. It can be an exchange or a queue, like in the example above.

Here is how to send a request:

Sending a request
Message request = requester.message("hello".getBytes(UTF_8)); (1)
CompletableFuture<Message> replyFuture = requester.publish(request); (2)
Message reply = replyFuture.get(10, TimeUnit.SECONDS); (3)
1 Create the message request
2 Send the request
3 Wait for the reply (synchronously)

The Requester#publish(Message) method returns a CompletableFuture<Message> that holds the reply message. It is then possible to wait for the reply asynchronously or synchronously.

The responder has the following behavior:

  • when receiving a message request, it calls the processing logic (handler), extracts the correlation ID, calls a reply post-processor if defined, and sends the reply message.

  • if all these operations succeed, the responder accepts the request message (settles it with the ACCEPTED outcome).

  • if any of these operations throws an exception, the responder discards the request message (the message is removed from the request queue and is dead-lettered if configured).

The responder uses the following defaults:

  • it uses the request message-id property for the correlation ID.

  • it assigns the correlation ID (so the request message-id by default) to the reply correlation-id property.

  • it assigns the request reply-to property to the reply to property if it is defined. This behavior is hardcoded, but it is possible to override it thanks to a reply post-processor.

The requester uses the following defaults:

  • it uses direct reply-to if available (RabbitMQ 4.2 or more) for replies if no reply-to queue is set (it falls back to an auto-delete, exclusive queue if direct reply-to is not available)

  • it uses a string-based correlation ID generator, with a fixed random UUID prefix and a strictly monotonic increasing sequence suffix ({UUID}-{sequence}, e.g. 6f839461-6b19-47e1-80b3-6be10d899d85-42). The prefix is different for each Requester instance and the suffix is incremented by one for each request.

  • it sets the request reply-to property to the reply-to queue address (defined by the user or created automatically, see above).

  • it sets the request message-id property to the generated correlation ID.

  • it extracts the correlation ID from the reply correlation-id property to correlate a reply with the appropriate request.

If the responder will perform expensive work, it can proactively check whether the requester is still present with the Context#isRequesterAlive(Message) method:

Checking the requester
Responder responder = connection.responderBuilder()
    .requestQueue("request-queue")
    .handler((ctx, req) -> {
        if (ctx.isRequesterAlive(req)) { (1)
          String in = new String(req.body(), UTF_8);
          String out = "*** " + in + " ***";
          return ctx.message(out.getBytes(UTF_8));
        } else {
          return null;
        }
    }).build();
1 Check if the requester is still connected

The responder can check the requester at any time - when a request comes in or even during a long processing - and react accordingly by stopping the processing. Note each call to Context#isRequesterAlive(Message) implies a request to the broker.

Let’s see how to customize some of the request/response support mechanics. Imagine the request message-id property is a critical piece of information and we do not want to use it as the correlation ID. The request can use the correlation-id property and the responder just has to extract the correlation ID from this property (instead of the message-id property by default). Let’s also use a random UUID for the correlation ID generation (avoid doing this in production: this is OK in terms of uniqueness but not optimal in terms of performance because randomness is not cheap).

Here is how to declare the requester:

Customizing the requester
String replyToQueue = connection.management().queue()
    .autoDelete(true).exclusive(true)
    .declare().name(); (1)
Requester requester = connection.requesterBuilder()
    .correlationIdSupplier(UUID::randomUUID) (2)
    .requestPostProcessor((msg, corrId) ->
        msg.correlationId(corrId) (3)
           .replyToAddress().queue(replyToQueue).message()) (4)
    .replyToQueue(replyToQueue)
    .requestAddress().queue("request-queue") (5)
    .requester()
    .build();
1 Declare the reply-to queue
2 Use a random UUID as correlation ID
3 Use the correlation-id property for the request
4 Set the reply-to property
5 Set the address to send request messages to

We just have to tell the responder to get the correlation ID from the request correlation-id property:

Customizing the responder
Responder responder = connection.responderBuilder()
    .correlationIdExtractor(Message::correlationId) (1)
    .requestQueue("request-queue")
    .handler((ctx, req) -> {
        String in = new String(req.body(), UTF_8);
        String out = "*** " + in + " ***";
        return ctx.message(out.getBytes(UTF_8));
    }).build();
1 Get the correlation ID from the request correlation-id property

Appendix A: Micrometer Observation

It is possible to use Micrometer Observation to instrument publishing and consuming in the AMQP Java client. Micrometer Observation provides metrics, tracing, and log correlation with one single API.

The AMQP Java client provides an ObservationCollector abstraction and an implementation for Micrometer Observation. The following snippet shows how to create and set up the Micrometer ObservationCollector implementation with an existing ObservationRegistry:

Configuring Micrometer Observation
Environment environment = new AmqpEnvironmentBuilder()
    .observationCollector(new MicrometerObservationCollectorBuilder()  (1)
        .registry(observationRegistry).build())  (2)
    .build();
1 Configure Micrometer ObservationCollector with builder
2 Set Micrometer ObservationRegistry

The next sections document the conventions, spans, and metrics made available by the instrumentation. They are automatically generated from the source code with the Micrometer documentation generator.

Observability - Conventions

Below you can find a list of all GlobalObservationConvention and ObservationConvention declared by this project.

Table 1. ObservationConvention implementations

ObservationConvention Class Name

Applicable ObservationContext Class Name

com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention

DeliverContext

com.rabbitmq.client.amqp.observation.micrometer.DeliverObservationConvention

DeliverContext

com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention

PublishContext

com.rabbitmq.client.amqp.observation.micrometer.PublishObservationConvention

PublishContext

Observability - Spans

Below you can find a list of all spans declared by this project.

Process Observation Span

Observation for processing a message.

Span name rabbitmq.amqp.process (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention).

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 2. Tag Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).

Publish Observation Span

Observation for publishing a message.

Span name rabbitmq.amqp.publish (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention).

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 3. Tag Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).

Observability - Metrics

Below you can find a list of all metrics declared by this project.

Process Observation

Observation for processing a message.

Metric name rabbitmq.amqp.process (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention). Type timer.

Metric name rabbitmq.amqp.process.active (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention). Type long task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 4. Low cardinality Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).

Publish Observation

Observation for publishing a message.

Metric name rabbitmq.amqp.publish (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention). Type timer.

Metric name rabbitmq.amqp.publish.active (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention). Type long task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 5. Low cardinality Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).