A detailed AMQP wrapper for Laravel and Lumen to publish and consume messages, especially from RabbitMQ. This package provides full support for RabbitMQ features including RPC patterns, management operations, message properties, and more.
- Advanced queue configuration
- Easy message publishing to queues
- Flexible queue consumption with useful options
- Support for all RabbitMQ exchange types (topic, direct, fanout, headers)
- Full AMQP message properties support
- RPC Pattern Support - Built-in request-response patterns with
rpc()andreply()methods - Queue Management - Programmatic control (purge, delete, unbind)
- Management HTTP API - Full integration with RabbitMQ Management API
- Policy Management - Create, update, and delete policies programmatically
- Feature Flags - Query RabbitMQ feature flags
- Enhanced Message Properties - Full support for priority, correlation_id, headers, etc.
- Listen Method - Auto-create queues and bind to multiple routing keys
- Connection Configuration Helper - Easy access to connection configs
- Publisher Confirms - Guaranteed message delivery
- Consumer Prefetch (QoS) - Rate limiting and flow control
- Queue Types - Classic, Quorum, and Stream queues
- Dead Letter Exchanges - Message routing for failed messages
- Message Priority - Priority-based message processing
- TTL Support - Message and queue expiration
- Lazy Queues - Disk-based message storage
- Alternate Exchange - Unroutable message handling
- PHP 8.1 or higher
- Laravel 8.x / 9.x / 10.x / 11.x or Lumen 8.x / 9.x / 10.x
- RabbitMQ 3.x (tested with
rabbitmq:3-managementDocker image)
composer require bschmitt/laravel-amqpFor Laravel 5.5+:
"bschmitt/laravel-amqp": "^3.1"For Laravel < 5.5:
"bschmitt/laravel-amqp": "^2.0"use Bschmitt\Amqp\Facades\Amqp;
// Basic publish
Amqp::publish('routing-key', 'message');
// Publish with queue creation
Amqp::publish('routing-key', 'message', ['queue' => 'queue-name']);
// Publish with message properties
Amqp::publish('routing-key', 'message', [
'priority' => 10,
'correlation_id' => 'unique-id',
'reply_to' => 'reply-queue',
'application_headers' => [
'X-Custom-Header' => 'value'
]
]);use Bschmitt\Amqp\Facades\Amqp;
// Consume and acknowledge (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
echo $message->body;
$resolver->acknowledge($message);
$resolver->stopWhenProcessed();
});
// Consume forever
$amqp = app('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
processMessage($message->body);
$resolver->acknowledge($message);
}, ['persistent' => true]);
// Alternative: Using resolve() helper
$amqp = resolve('Amqp');
$amqp->consume('queue-name', function ($message, $resolver) {
processMessage($message->body);
$resolver->acknowledge($message);
});// Client side - Make RPC call (using dynamic call)
$amqp = app('Amqp');
$response = $amqp->rpc('rpc-queue', 'request-data', [], 30);
// Server side - Process and reply (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('rpc-queue', function ($message, $resolver) {
$result = processRequest($message->body);
$resolver->reply($message, $result);
$resolver->acknowledge($message);
});$amqp = app('Amqp');
$amqp->listen(['key1', 'key2', 'key3'], function ($message, $resolver) {
processMessage($message->body);
$resolver->acknowledge($message);
});Publish the configuration file:
php artisan vendor:publish --provider="Bschmitt\Amqp\Providers\AmqpServiceProvider"Or manually copy vendor/bschmitt/laravel-amqp/config/amqp.php to config/amqp.php.
Create a config folder in your Lumen root and copy the configuration file:
mkdir config
cp vendor/bschmitt/laravel-amqp/config/amqp.php config/amqp.phpRegister the service provider in bootstrap/app.php:
$app->configure('amqp');
$app->register(Bschmitt\Amqp\Providers\LumenServiceProvider::class);
// For Lumen 5.2+, enable facades
$app->withFacades(true, [
'Bschmitt\Amqp\Facades\Amqp' => 'Amqp',
]);return [
'use' => 'production',
'properties' => [
'production' => [
'host' => env('AMQP_HOST', 'localhost'),
'port' => env('AMQP_PORT', 5672),
'username' => env('AMQP_USER', 'guest'),
'password' => env('AMQP_PASSWORD', 'guest'),
'vhost' => env('AMQP_VHOST', '/'),
'exchange' => env('AMQP_EXCHANGE', 'amq.topic'),
'exchange_type' => env('AMQP_EXCHANGE_TYPE', 'topic'),
'consumer_tag' => 'consumer',
'ssl_options' => [],
'connect_options' => [],
'queue_properties' => ['x-ha-policy' => ['S', 'all']],
'exchange_properties' => [],
'timeout' => 0,
// Management API (optional)
'management_api_url' => env('AMQP_MANAGEMENT_URL', 'http://localhost:15672'),
'management_api_user' => env('AMQP_MANAGEMENT_USER', 'guest'),
'management_api_password' => env('AMQP_MANAGEMENT_PASSWORD', 'guest'),
],
],
];- User Manual - Complete usage guide
- Release Notes - Version 3.1.0 changelog
- FAQ - Common questions and answers
- Getting Started - Installation and first steps
- Configuration - Configuration guide
- Publishing Messages - Publishing guide
- Consuming Messages - Consumption guide
- RPC Pattern - Request-response patterns
- Queue Management - Queue operations
- Management API - HTTP API integration
- Message Properties - Message properties
- Advanced Features - Advanced usage
- Architecture - Package architecture
- Testing - Testing guide
See docs/modules/ for detailed module documentation:
- RPC Module
- Management Operations
- Management API
- Message Properties
- Consumer Prefetch
- And more...
// Publishing
Amqp::publish('', 'message', [
'exchange_type' => 'fanout',
'exchange' => 'amq.fanout',
]);
// Consuming (using dynamic call)
$amqp = app('Amqp');
$amqp->consume('', function ($message, $resolver) {
echo $message->body;
$resolver->acknowledge($message);
}, [
'routing' => '',
'exchange' => 'amq.fanout',
'exchange_type' => 'fanout',
'queue_force_declare' => true,
'queue_exclusive' => true,
'persistent' => true
]);// Get Amqp instance
$amqp = app('Amqp');
// Purge queue
$amqp->queuePurge('my-queue', ['queue' => 'my-queue']);
// Delete queue
$amqp->queueDelete('my-queue', ['queue' => 'my-queue']);
// Get queue statistics
$stats = $amqp->getQueueStats('my-queue', '/');// Get Amqp instance
$amqp = app('Amqp');
// Get queue statistics
$stats = $amqp->getQueueStats('my-queue', '/');
// List connections
$connections = $amqp->getConnections();
// Create policy
$amqp->createPolicy('my-policy', [
'pattern' => '^my-queue$',
'definition' => ['max-length' => 1000]
], '/');The package includes comprehensive test coverage:
# Run all tests
php vendor/bin/phpunit
# Run unit tests only
php vendor/bin/phpunit test/Unit/
# Run integration tests only
php vendor/bin/phpunit test/Integration/Test Requirements:
- RabbitMQ server running (for integration tests)
- Docker:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
See Testing Guide for more information.
RPC:
$amqp->rpc($routingKey, $request, $properties, $timeout)- Make RPC calls (use$amqp = app('Amqp'))Consumer::reply($message, $response, $properties)- Send RPC responses$amqp->listen($routingKeys, $callback, $properties)- Auto-create queues with multiple bindings (use$amqp = app('Amqp'))
Management:
$amqp->queuePurge($queue, $properties)- Purge queue (use$amqp = app('Amqp'))$amqp->queueDelete($queue, $ifUnused, $ifEmpty, $properties)- Delete queue$amqp->queueUnbind(...)- Unbind queue$amqp->exchangeDelete(...)- Delete exchange$amqp->exchangeUnbind(...)- Unbind exchange
Management API:
$amqp->getQueueStats($queue, $vhost, $properties)- Queue statistics (use$amqp = app('Amqp'))$amqp->getConnections($connectionName, $properties)- List connections$amqp->getChannels($channelName, $properties)- List channels$amqp->getNodes($nodeName, $properties)- Cluster nodes$amqp->getPolicies($properties)- List policies$amqp->createPolicy(...)- Create policy$amqp->updatePolicy(...)- Update policy$amqp->deletePolicy(...)- Delete policy$amqp->listFeatureFlags($properties)- List feature flags$amqp->getFeatureFlag($name, $properties)- Get feature flag
Helpers:
$amqp->getConnectionConfig($connectionName)- Get connection config (use$amqp = app('Amqp'))
Note: For consume(), listen(), rpc(), and all management methods, you must resolve the Amqp instance from the container using $amqp = app('Amqp') or $amqp = resolve('Amqp'). The static facade Amqp:: works for publish() but not for consume() and other instance methods.
Version 3.1.0 is fully backward compatible with previous versions. All existing code will continue to work without modifications.
Contributions are welcome! Please feel free to submit a Pull Request.
- Some concepts were used from mookofe/tail
- Built and tested with
rabbitmq:3-managementDocker image
This package is open-sourced software licensed under the MIT license.
For issues, questions, or contributions:
- GitHub Issues: https://github.com/bschmitt/laravel-amqp/issues
- Documentation: See
docs/directory - FAQ: docs/laravel-amqp.wiki/FAQ.md
Version: 3.1.0
Status: Ready