The RabbitMQ AMQP 1.0 Java Client is a library to communicate with RabbitMQ using AMQP 1.0.
Pre-requisites
This library requires at least Java 11 (Java 21+ is recommended) and RabbitMQ 4.0 or more.
License
The library is open source, developed on GitHub, and is licensed under the Apache Public License 2.0
Dependencies
Use your favorite build management tool to add the client dependencies to your project.
Maven
<dependencies>
<dependency>
<groupId>com.rabbitmq.client</groupId>
<artifactId>amqp-client</artifactId>
<version>0.3.0</version>
</dependency>
</dependencies>
Snapshots require to declare the appropriate repository.
Gradle
dependencies {
compile "com.rabbitmq.client:amqp-client:0.3.0"
}
Snapshots require to declare the appropriate repository.
Snapshots
Releases are available from Maven Central, which does not require specific declaration. Snapshots are available from a repository which must be declared in the dependency management configuration.
With Maven:
<repositories>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots><enabled>true</enabled></snapshots>
<releases><enabled>false</enabled></releases>
</repository>
</repositories>
With Gradle:
repositories {
maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
mavenCentral()
}
Versioning
The RabbitMQ AMQP 1.0 Java Client is in development and stabilization phase. When the stabilization phase ends, a 1.0.0 version will be cut, and semantic versioning is likely to be enforced.
Before reaching the stable phase, the client will use a versioning scheme of [0.MINOR.PATCH]
where:
-
0
indicates the project is still in a stabilization phase. -
MINOR
is a 0-based number incrementing with each new release cycle. It generally reflects significant changes like new features and potentially some programming interfaces changes. -
PATCH
is a 0-based number incrementing with each service release, that is bux fixes.
Breaking changes between releases can happen but will be kept to a minimum. The next section provides more details about the evolution of programming interfaces.
Stability of Programming Interfaces
The RabbitMQ AMQP 1.0 Java Client is in active development but its programming interfaces will remain as stable as possible. There is no guarantee though that they will remain completely stable, at least until it reaches version 1.0.0.
The client contains 2 sets of programming interfaces whose stability are of interest for application developers:
-
Application Programming Interfaces (API): those are the ones used to write application logic. They include the interfaces and classes in the
com.rabbitmq.client.amqp
package (e.g.Connection
,Publisher
,Consumer
,Message
). These API constitute the main programming model of the client and will be kept as stable as possible. -
Service Provider Interfaces (SPI): those are interfaces to implement mainly technical behavior in the client. They are not meant to be used to implement application logic. Application developers may have to refer to them in the configuration phase and if they want to customize some internal behavior of the client. SPI packages and interfaces are marked as such in their Javadoc. These SPI are susceptible to change, but this should have no impact on most applications, as the changes are likely to be limited to the client internals.
Overview
The RabbitMQ team maintains a set of AMQP 1.0 client libraries designed and optimized for RabbitMQ. They offer a simple and safe, yet powerful API on top of AMQP 1.0. Applications can publish and consume messages with these libraries, as well as manage the server topology in a consistent way across programming languages. The libraries also offer advanced features like automatic connection and topology recovery, and connection affinity with queues.
The RabbitMQ AMQP 1.0 Java Client is one of these libraries. This documentation covers only the advanced usage of the library; a common page on the RabbitMQ website covers the most common use cases.
Advanced Usage
Connection Settings at the Environment Level
Connection settings (URI, credentials, etc) can be set at the environment level. They will then be re-used for each connection. It is possible to override the environment connection settings for a given connection:
Environment environment = new AmqpEnvironmentBuilder()
.connectionSettings()
.uri("amqp://guest:guest@localhost:5672/%2f") (1)
.environmentBuilder().build();
Connection connection = environment.connectionBuilder()
.uri("amqp://admin:admin@localhost:5672/%2f") (2)
.build();
1 | Use the guest user by default |
2 | Use the admin user for this connection |
Subscription Listener
The client provides a SubscriptionListener
interface callback to add behavior before a subscription is created.
This callback is meant for stream consumers: it can be used to dynamically set the offset the consumer attaches to in the stream.
It is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection).
It is possible to use the callback to get the last processed offset from an external store. The following code snippet shows how this can be done (note the interaction with the external store is not detailed):
connection.consumerBuilder()
.queue("some-stream")
.subscriptionListener(ctx -> { (1)
long offset = getOffsetFromExternalStore(); (2)
ctx.streamOptions().offset(offset + 1); (3)
})
.messageHandler((ctx, msg) -> {
// message handling code...
long offset = (long) msg.annotation("x-stream-offset"); (4)
storeOffsetInExternalStore(offset); (5)
})
.build();
1 | Set subscription listener |
2 | Get offset from external store |
3 | Set offset to use for the subscription |
4 | Get the message offset |
5 | Store the offset in the external store after processing |
Metrics Collection
The library provides the MetricsCollector
abstraction to collect metrics.
A MetricsCollector
instance is set at the environment level and is called in several places by the library.
The underlying implementation can then collect, compute and expose metrics.
The library provides the MicrometerMetricsCollector
, an implementation based on Micrometer.
Micrometer is a façade for observability systems: the user can choose the Micrometer meter registry for their favorite system.
Here is how to set up the Micrometer metrics collector and configure it for Prometheus:
PrometheusMeterRegistry registry = new PrometheusMeterRegistry( (1)
PrometheusConfig.DEFAULT
);
MetricsCollector collector = new MicrometerMetricsCollector(registry); (2)
Environment environment = new AmqpEnvironmentBuilder()
.metricsCollector(collector) (3)
.build();
Connection connection = environment.connectionBuilder().build(); (4)
1 | Create the Micrometer meter registry for Prometheus |
2 | Create the metrics collector with the registry |
3 | Set the metrics collector in the environment builder |
4 | Create a connection, a Micrometer gauge is incremented |
The following metrics are recorded:
-
the number of open connections
-
the number of open publishers
-
the number of open consumers
-
the total number of published messages
-
the total number of accepted published messages
-
the total number of rejected published messages
-
the total number of released published messages
-
the total number of consumed messages
-
the total number of accepted consumed messages
-
the total number of requeued consumed messages
-
the total number of discarded consumed messages
Here is an example of the Prometheus output:
# HELP rabbitmq_amqp_connections
# TYPE rabbitmq_amqp_connections gauge
rabbitmq_amqp_connections 1.0
# HELP rabbitmq_amqp_consumed_total
# TYPE rabbitmq_amqp_consumed_total counter
rabbitmq_amqp_consumed_total 3.0
# HELP rabbitmq_amqp_consumed_accepted_total
# TYPE rabbitmq_amqp_consumed_accepted_total counter
rabbitmq_amqp_consumed_accepted_total 1.0
# HELP rabbitmq_amqp_consumed_discarded_total
# TYPE rabbitmq_amqp_consumed_discarded_total counter
rabbitmq_amqp_consumed_discarded_total 1.0
# HELP rabbitmq_amqp_consumed_requeued_total
# TYPE rabbitmq_amqp_consumed_requeued_total counter
rabbitmq_amqp_consumed_requeued_total 1.0
# HELP rabbitmq_amqp_consumers
# TYPE rabbitmq_amqp_consumers gauge
rabbitmq_amqp_consumers 1.0
# HELP rabbitmq_amqp_published_total
# TYPE rabbitmq_amqp_published_total counter
rabbitmq_amqp_published_total 2.0
# HELP rabbitmq_amqp_published_accepted_total
# TYPE rabbitmq_amqp_published_accepted_total counter
rabbitmq_amqp_published_accepted_total 1.0
# HELP rabbitmq_amqp_published_rejected_total
# TYPE rabbitmq_amqp_published_rejected_total counter
rabbitmq_amqp_published_rejected_total 1.0
# HELP rabbitmq_amqp_published_released_total
# TYPE rabbitmq_amqp_published_released_total counter
rabbitmq_amqp_published_released_total 1.0
# HELP rabbitmq_amqp_publishers
# TYPE rabbitmq_amqp_publishers gauge
rabbitmq_amqp_publishers 1.0
Remote Procedure Call (RPC)
Remote procedure call with RabbitMQ consists in a client sending a request message and a server replying with a response message. Both the RPC client and server are client applications and the messages flow through the broker. The RPC client must send a reply-to queue address with the request. The RPC server uses this reply-to queue address to send the response. There must also be a way to correlate a request with its response, this is usually handled with a header that the RPC client and server agree on.
The library provides RPC client and server support classes. They use sensible defaults and some of the internal mechanics are configurable. They should meet the requirements of most RPC use cases. It is still possible to implement one part or the other with regular publishers and consumers for special cases, as this is what the RPC support classes do.
Here is how to create an RPC server instance:
RpcServer rpcServer = connection.rpcServerBuilder() (1)
.requestQueue("rpc-server") (2)
.handler((ctx, req) -> { (3)
String in = new String(req.body(), UTF_8);
String out = "*** " + in + " ***";
return ctx.message(out.getBytes(UTF_8)); (4)
}).build();
1 | Use builder from connection |
2 | Set the queue to consume requests from (it must exist) |
3 | Define the processing logic |
4 | Create the reply message |
Note the RPC server does not create the queue it waits requests on. It must be created beforehand.
Here is how to create an RPC client:
RpcClient rpcClient = connection.rpcClientBuilder() (1)
.requestAddress().queue("rpc-server") (2)
.rpcClient()
.build();
1 | Use builder from connection |
2 | Set the address to send request messages to |
The RPC client will send its request to the configured destination. It can be an exchange or a queue, like in the example above.
Here is how to send a request:
Message request = rpcClient.message("hello".getBytes(UTF_8)); (1)
CompletableFuture<Message> replyFuture = rpcClient.publish(request); (2)
Message reply = replyFuture.get(10, TimeUnit.SECONDS); (3)
1 | Create the message request |
2 | Send the request |
3 | Wait for the reply (synchronously) |
The RpcClient#publish(Message)
method returns a CompletableFuture<Message>
that holds the reply message.
It is then possible to wait for the reply asynchronously or synchronously.
The RPC server uses the following defaults:
-
it uses the request
message-id
property for the correlation ID. -
it assigns the correlation ID (so the request
message-id
by default) to the replycorrelation-id
property. -
it assigns the request
reply-to
property to the replyto
property if it is defined. This behavior is hardcoded but it is possible to cancel it thanks to a reply post-processor.
The RPC client uses the following defaults:
-
it creates and waits for replies on an auto-delete, exclusive queue if no reply-to queue is set.
-
it uses a string-based correlation ID generator, with a fixed random UUID prefix and a strictly monotonic increasing sequence suffix (
{UUID}-{sequence}
, e.g.6f839461-6b19-47e1-80b3-6be10d899d85-42
). The prefix is different for eachRpcClient
instance and the suffix is incremented by one for each request. -
it sets the request
reply-to
property to the reply-to queue address (defined by the user or created automatically, see above). -
it sets the request
message-id
property to the generated correlation ID. -
it extracts the correlation ID from the reply
correlation-id
property to correlate a reply with the appropriate request.
Let’s see how to customize some of the RPC support mechanics.
Imagine the request message-id
property is a critical piece of information and we do not want to use it as the correlation ID.
The request can use the correlation-id
property and the RPC server just has to extract the correlation ID from this property (instead of the message-id
property by default).
Let’s also use a random UUID for the correlation ID generation (avoids doing this in production: this is OK in terms of uniqueness but not optimal in terms of performance because randomness is not cheap).
Here is how to declare the RPC client:
String replyToQueue = connection.management().queue()
.autoDelete(true).exclusive(true)
.declare().name(); (1)
RpcClient rpcClient = connection.rpcClientBuilder()
.correlationIdSupplier(UUID::randomUUID) (2)
.requestPostProcessor((msg, corrId) ->
msg.correlationId(corrId) (3)
.replyToAddress().queue(replyToQueue).message()) (4)
.replyToQueue(replyToQueue)
.requestAddress().queue("rpc-server") (5)
.rpcClient()
.build();
1 | Declare the reply-to queue |
2 | Use a random UUID as correlation ID |
3 | Use the correlation-id property for the request |
4 | Set the reply-to property |
5 | Set the address to send request messages to |
We just have to tell the RPC server to get the correlation ID from the request correlation-id
property:
RpcServer rpcServer = connection.rpcServerBuilder()
.correlationIdExtractor(Message::correlationId) (1)
.requestQueue("rpc-server")
.handler((ctx, req) -> {
String in = new String(req.body(), UTF_8);
String out = "*** " + in + " ***";
return ctx.message(out.getBytes(UTF_8));
}).build();
1 | Get the correlation ID from the request correlation-id property |
Appendix A: Micrometer Observation
It is possible to use Micrometer Observation to instrument publishing and consuming in the AMQP Java client. Micrometer Observation provides metrics, tracing, and log correlation with one single API.
The AMQP Java client provides an ObservationCollector
abstraction and an implementation for Micrometer Observation.
The following snippet shows how to create and set up the Micrometer ObservationCollector
implementation with an existing ObservationRegistry
:
Environment environment = new AmqpEnvironmentBuilder()
.observationCollector(new MicrometerObservationCollectorBuilder() (1)
.registry(observationRegistry).build()) (2)
.build();
1 | Configure Micrometer ObservationCollector with builder |
2 | Set Micrometer ObservationRegistry |
The next sections document the conventions, spans, and metrics made available by the instrumentation. They are automatically generated from the source code with the Micrometer documentation generator.
Observability - Conventions
Below you can find a list of all GlobalObservationConvention
and ObservationConvention
declared by this project.
ObservationConvention Class Name |
Applicable ObservationContext Class Name |
|
|
|
|
|
|
|
|
Observability - Spans
Below you can find a list of all spans declared by this project.
Process Observation Span
Observation for processing a message.
Span name rabbitmq.amqp.process
(defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention
).
Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation
.
Name |
Description |
|
A string identifying the kind of messaging operation. |
|
A string identifying the messaging system. |
|
A string identifying the protocol (AMQP). |
|
A string identifying the protocol version (1.0). |
Publish Observation Span
Observation for publishing a message.
Span name rabbitmq.amqp.publish
(defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention
).
Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation
.
Name |
Description |
|
A string identifying the kind of messaging operation. |
|
A string identifying the messaging system. |
|
A string identifying the protocol (AMQP). |
|
A string identifying the protocol version (1.0). |
Observability - Metrics
Below you can find a list of all metrics declared by this project.
Process Observation
Observation for processing a message.
Metric name rabbitmq.amqp.process
(defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention
). Type timer
.
Metric name rabbitmq.amqp.process.active
(defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultProcessObservationConvention
). Type long task timer
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
|
Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation
.
Name |
Description |
|
A string identifying the kind of messaging operation. |
|
A string identifying the messaging system. |
|
A string identifying the protocol (AMQP). |
|
A string identifying the protocol version (1.0). |
Publish Observation
Observation for publishing a message.
Metric name rabbitmq.amqp.publish
(defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention
). Type timer
.
Metric name rabbitmq.amqp.publish.active
(defined by convention class com.rabbitmq.client.amqp.observation.micrometer.DefaultPublishObservationConvention
). Type long task timer
.
KeyValues that are added after starting the Observation might be missing from the *.active metrics. |
Micrometer internally uses nanoseconds for the baseunit. However, each backend determines the actual baseunit. (i.e. Prometheus uses seconds)
|
Fully qualified name of the enclosing class com.rabbitmq.client.amqp.observation.micrometer.AmqpObservationDocumentation
.
Name |
Description |
|
A string identifying the kind of messaging operation. |
|
A string identifying the messaging system. |
|
A string identifying the protocol (AMQP). |
|
A string identifying the protocol version (1.0). |