The RabbitMQ AMQP 1.0 Java Client is a library to communicate with RabbitMQ using AMQP 1.0.

Pre-requisites

This library requires at least Java 11 (Java 21+ is recommended) and RabbitMQ 4.0 or more.

License

The library is open source, developed on GitHub, and is licensed under the Apache Public License 2.0

Dependencies

Use your favorite build management tool to add the client dependencies to your project.

Maven

pom.xml
<dependencies>

  <dependency>
    <groupId>com.rabbitmq.client</groupId>
    <artifactId>amqp-client</artifactId>
    <version>0.4.0-SNAPSHOT</version>
  </dependency>

</dependencies>

Snapshots require to declare the appropriate repository.

Gradle

build.gradle
dependencies {
  compile "com.rabbitmq.client:amqp-client:0.4.0-SNAPSHOT"
}

Snapshots require to declare the appropriate repository.

Snapshots

Releases are available from Maven Central, which does not require specific declaration. Snapshots are available from a repository which must be declared in the dependency management configuration.

With Maven:

Snapshot repository declaration for Maven
<repositories>

  <repository>
    <id>ossrh</id>
    <url>https://oss.sonatype.org/content/repositories/snapshots</url>
    <snapshots><enabled>true</enabled></snapshots>
    <releases><enabled>false</enabled></releases>
  </repository>

</repositories>

With Gradle:

Snapshot repository declaration for Gradle:
repositories {
  maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
  mavenCentral()
}

Versioning

The RabbitMQ AMQP 1.0 Java Client is in development and stabilization phase. When the stabilization phase ends, a 1.0.0 version will be cut, and semantic versioning is likely to be enforced.

Before reaching the stable phase, the client will use a versioning scheme of [0.MINOR.PATCH] where:

  • 0 indicates the project is still in a stabilization phase.

  • MINOR is a 0-based number incrementing with each new release cycle. It generally reflects significant changes like new features and potentially some programming interfaces changes.

  • PATCH is a 0-based number incrementing with each service release, that is bux fixes.

Breaking changes between releases can happen but will be kept to a minimum. The next section provides more details about the evolution of programming interfaces.

Stability of Programming Interfaces

The RabbitMQ AMQP 1.0 Java Client is in active development but its programming interfaces will remain as stable as possible. There is no guarantee though that they will remain completely stable, at least until it reaches version 1.0.0.

The client contains 2 sets of programming interfaces whose stability are of interest for application developers:

  • Application Programming Interfaces (API): those are the ones used to write application logic. They include the interfaces and classes in the com.rabbitmq.client.amqp package (e.g. Connection, Publisher, Consumer, Message). These API constitute the main programming model of the client and will be kept as stable as possible.

  • Service Provider Interfaces (SPI): those are interfaces to implement mainly technical behavior in the client. They are not meant to be used to implement application logic. Application developers may have to refer to them in the configuration phase and if they want to customize some internal behavior of the client. SPI packages and interfaces are marked as such in their Javadoc. These SPI are susceptible to change, but this should have no impact on most applications, as the changes are likely to be limited to the client internals.

Overview

The RabbitMQ team maintains a set of AMQP 1.0 client libraries designed and optimized for RabbitMQ. They offer a simple and safe, yet powerful API on top of AMQP 1.0. Applications can publish and consume messages with these libraries, as well as manage the server topology in a consistent way across programming languages. The libraries also offer advanced features like automatic connection and topology recovery, and connection affinity with queues.

The RabbitMQ AMQP 1.0 Java Client is one of these libraries. This documentation covers only the advanced usage of the library; a common page on the RabbitMQ website covers the most common use cases.

Advanced Usage

Connection Settings at the Environment Level

Connection settings (URI, credentials, etc) can be set at the environment level. They will then be re-used for each connection. It is possible to override the environment connection settings for a given connection:

Connection settings at the environment and connection levels
Environment environment = new AmqpEnvironmentBuilder()
    .connectionSettings()
    .uri("amqp://guest:guest@localhost:5672/%2f") (1)
    .environmentBuilder().build();

Connection connection = environment.connectionBuilder()
    .uri("amqp://admin:admin@localhost:5672/%2f") (2)
    .build();
1 Use the guest user by default
2 Use the admin user for this connection

Subscription Listener

The client provides a SubscriptionListener interface callback to add behavior before a subscription is created. This callback is meant for stream consumers: it can be used to dynamically set the offset the consumer attaches to in the stream. It is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection).

It is possible to use the callback to get the last processed offset from an external store. The following code snippet shows how this can be done (note the interaction with the external store is not detailed):

Using the subscription listener to attach to a stream
connection.consumerBuilder()
    .queue("some-stream")
    .subscriptionListener(ctx -> {  (1)
      long offset = getOffsetFromExternalStore();  (2)
      ctx.streamOptions().offset(offset + 1);  (3)
    })
    .messageHandler((ctx, msg) -> {
      // message handling code...

      long offset = (long) msg.annotation("x-stream-offset");  (4)
      storeOffsetInExternalStore(offset);  (5)
    })
    .build();
1 Set subscription listener
2 Get offset from external store
3 Set offset to use for the subscription
4 Get the message offset
5 Store the offset in the external store after processing

Metrics Collection

The library provides the MetricsCollector abstraction to collect metrics. A MetricsCollector instance is set at the environment level and is called in several places by the library. The underlying implementation can then collect, compute and expose metrics.

The library provides the MicrometerMetricsCollector, an implementation based on Micrometer. Micrometer is a façade for observability systems: the user can choose the Micrometer meter registry for their favorite system.

Here is how to set up the Micrometer metrics collector and configure it for Prometheus:

Setting metrics collection with Prometheus
PrometheusMeterRegistry registry = new PrometheusMeterRegistry( (1)
    PrometheusConfig.DEFAULT
);
MetricsCollector collector = new MicrometerMetricsCollector(registry); (2)

Environment environment = new AmqpEnvironmentBuilder()
    .metricsCollector(collector) (3)
    .build();

Connection connection = environment.connectionBuilder().build(); (4)
1 Create the Micrometer meter registry for Prometheus
2 Create the metrics collector with the registry
3 Set the metrics collector in the environment builder
4 Create a connection, a Micrometer gauge is incremented

The following metrics are recorded:

  • the number of open connections

  • the number of open publishers

  • the number of open consumers

  • the total number of published messages

  • the total number of accepted published messages

  • the total number of rejected published messages

  • the total number of released published messages

  • the total number of consumed messages

  • the total number of accepted consumed messages

  • the total number of requeued consumed messages

  • the total number of discarded consumed messages

Here is an example of the Prometheus output:

Example of the Prometheus output
# HELP rabbitmq_amqp_connections
# TYPE rabbitmq_amqp_connections gauge
rabbitmq_amqp_connections 1.0
# HELP rabbitmq_amqp_consumed_total
# TYPE rabbitmq_amqp_consumed_total counter
rabbitmq_amqp_consumed_total 3.0
# HELP rabbitmq_amqp_consumed_accepted_total
# TYPE rabbitmq_amqp_consumed_accepted_total counter
rabbitmq_amqp_consumed_accepted_total 1.0
# HELP rabbitmq_amqp_consumed_discarded_total
# TYPE rabbitmq_amqp_consumed_discarded_total counter
rabbitmq_amqp_consumed_discarded_total 1.0
# HELP rabbitmq_amqp_consumed_requeued_total
# TYPE rabbitmq_amqp_consumed_requeued_total counter
rabbitmq_amqp_consumed_requeued_total 1.0
# HELP rabbitmq_amqp_consumers
# TYPE rabbitmq_amqp_consumers gauge
rabbitmq_amqp_consumers 1.0
# HELP rabbitmq_amqp_published_total
# TYPE rabbitmq_amqp_published_total counter
rabbitmq_amqp_published_total 2.0
# HELP rabbitmq_amqp_published_accepted_total
# TYPE rabbitmq_amqp_published_accepted_total counter
rabbitmq_amqp_published_accepted_total 1.0
# HELP rabbitmq_amqp_published_rejected_total
# TYPE rabbitmq_amqp_published_rejected_total counter
rabbitmq_amqp_published_rejected_total 1.0
# HELP rabbitmq_amqp_published_released_total
# TYPE rabbitmq_amqp_published_released_total counter
rabbitmq_amqp_published_released_total 1.0
# HELP rabbitmq_amqp_publishers
# TYPE rabbitmq_amqp_publishers gauge
rabbitmq_amqp_publishers 1.0

Remote Procedure Call (RPC)

Remote procedure call with RabbitMQ consists in a client sending a request message and a server replying with a response message. Both the RPC client and server are client applications and the messages flow through the broker. The RPC client must send a reply-to queue address with the request. The RPC server uses this reply-to queue address to send the response. There must also be a way to correlate a request with its response, this is usually handled with a header that the RPC client and server agree on.

The library provides RPC client and server support classes. They use sensible defaults and some of the internal mechanics are configurable. They should meet the requirements of most RPC use cases. It is still possible to implement one part or the other with regular publishers and consumers for special cases, as this is what the RPC support classes do.

Here is how to create an RPC server instance:

Creating an RPC server
RpcServer rpcServer = connection.rpcServerBuilder() (1)
    .requestQueue("rpc-server") (2)
    .handler((ctx, req) -> { (3)
      String in = new String(req.body(), UTF_8);
      String out = "*** " + in + " ***";
      return ctx.message(out.getBytes(UTF_8)); (4)
    }).build();
1 Use builder from connection
2 Set the queue to consume requests from (it must exist)
3 Define the processing logic
4 Create the reply message

Note the RPC server does not create the queue it waits requests on. It must be created beforehand.

Here is how to create an RPC client:

Creating an RPC client
RpcClient rpcClient = connection.rpcClientBuilder() (1)
    .requestAddress().queue("rpc-server") (2)
    .rpcClient()
    .build();
1 Use builder from connection
2 Set the address to send request messages to

The RPC client will send its request to the configured destination. It can be an exchange or a queue, like in the example above.

Here is how to send a request:

Sending a request
Message request = rpcClient.message("hello".getBytes(UTF_8)); (1)
CompletableFuture<Message> replyFuture = rpcClient.publish(request); (2)
Message reply = replyFuture.get(10, TimeUnit.SECONDS); (3)
1 Create the message request
2 Send the request
3 Wait for the reply (synchronously)

The RpcClient#publish(Message) method returns a CompletableFuture<Message> that holds the reply message. It is then possible to wait for the reply asynchronously or synchronously.

The RPC server uses the following defaults:

The RPC client uses the following defaults:

  • it creates and waits for replies on an auto-delete, exclusive queue if no reply-to queue is set.

  • it uses a string-based correlation ID generator, with a fixed random UUID prefix and a strictly monotonic increasing sequence suffix ({UUID}-{sequence}, e.g. 6f839461-6b19-47e1-80b3-6be10d899d85-42). The prefix is different for each RpcClient instance and the suffix is incremented by one for each request.

  • it sets the request reply-to property to the reply-to queue address (defined by the user or created automatically, see above).

  • it sets the request message-id property to the generated correlation ID.

  • it extracts the correlation ID from the reply correlation-id property to correlate a reply with the appropriate request.

Let’s see how to customize some of the RPC support mechanics. Imagine the request message-id property is a critical piece of information and we do not want to use it as the correlation ID. The request can use the correlation-id property and the RPC server just has to extract the correlation ID from this property (instead of the message-id property by default). Let’s also use a random UUID for the correlation ID generation (avoids doing this in production: this is OK in terms of uniqueness but not optimal in terms of performance because randomness is not cheap).

Here is how to declare the RPC client:

Customizing the RPC client
String replyToQueue = connection.management().queue()
    .autoDelete(true).exclusive(true)
    .declare().name(); (1)
RpcClient rpcClient = connection.rpcClientBuilder()
    .correlationIdSupplier(UUID::randomUUID) (2)
    .requestPostProcessor((msg, corrId) ->
        msg.correlationId(corrId) (3)
           .replyToAddress().queue(replyToQueue).message()) (4)
    .replyToQueue(replyToQueue)
    .requestAddress().queue("rpc-server") (5)
    .rpcClient()
    .build();
1 Declare the reply-to queue
2 Use a random UUID as correlation ID
3 Use the correlation-id property for the request
4 Set the reply-to property
5 Set the address to send request messages to

We just have to tell the RPC server to get the correlation ID from the request correlation-id property:

Customizing the RPC server
RpcServer rpcServer = connection.rpcServerBuilder()
    .correlationIdExtractor(Message::correlationId) (1)
    .requestQueue("rpc-server")
    .handler((ctx, req) -> {
      String in = new String(req.body(), UTF_8);
      String out = "*** " + in + " ***";
      return ctx.message(out.getBytes(UTF_8));
    }).build();
1 Get the correlation ID from the request correlation-id property

Appendix A: Micrometer Observation

It is possible to use Micrometer Observation to instrument publishing and consuming in the AMQP Java client. Micrometer Observation provides metrics, tracing, and log correlation with one single API.

The AMQP Java client provides an ObservationCollector abstraction and an implementation for Micrometer Observation. The following snippet shows how to create and set up the Micrometer ObservationCollector implementation with an existing ObservationRegistry:

Configuring Micrometer Observation
Environment environment = new AmqpEnvironmentBuilder()
    .observationCollector(new MicrometerObservationCollectorBuilder()  (1)
        .registry(observationRegistry).build())  (2)
    .build();
1 Configure Micrometer ObservationCollector with builder
2 Set Micrometer ObservationRegistry

The next sections document the conventions, spans, and metrics made available by the instrumentation. They are automatically generated from the source code with the Micrometer documentation generator.

Observability - Conventions

Below you can find a list of all GlobalObservationConvention and ObservationConvention declared by this project.

Table 1. ObservationConvention implementations

ObservationConvention Class Name

Applicable ObservationContext Class Name

com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention

DeliverContext

com.rabbitmq.client.amqp.observation.micrometer.DeliverObservationConvention

DeliverContext

com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention

PublishContext

com.rabbitmq.client.amqp.observation.micrometer.PublishObservationConvention

PublishContext

Observability - Spans

Below you can find a list of all spans declared by this project.

Process Observation Span

Observation for processing a message.

Span name rabbitmq.amqp.process (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention).

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 2. Tag Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).

Publish Observation Span

Observation for publishing a message.

Span name rabbitmq.amqp.publish (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention).

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 3. Tag Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).

Observability - Metrics

Below you can find a list of all metrics declared by this project.

Process Observation

Observation for processing a message.

Metric name rabbitmq.amqp.process (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention). Type timer.

Metric name rabbitmq.amqp.process.active (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention). Type long task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 4. Low cardinality Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).

Publish Observation

Observation for publishing a message.

Metric name rabbitmq.amqp.publish (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention). Type timer.

Metric name rabbitmq.amqp.publish.active (defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention). Type long task timer.

KeyValues that are added after starting the Observation might be missing from the *.active metrics.
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)

Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation.

Table 5. Low cardinality Keys

Name

Description

messaging.operation (required)

A string identifying the kind of messaging operation.

messaging.system (required)

A string identifying the messaging system.

net.protocol.name (required)

A string identifying the protocol (AMQP).

net.protocol.version (required)

A string identifying the protocol version (1.0).