Symfony RabbitMqBundle

Symfony de RabbitMQ Servsine php-amqplib kütüphanesi ile bağlantısını anlatacağım.

Rabbit bundle da Thumperin php-amqlib kütüphanesini kullanıyoruz.

Örnek:

$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
$this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));

50 tane mesaj istegi oldugunda aşagıdaki komutu çalıştırın.

$ ./app/console rabbitmq:consumer -m 50 upload_picture

RabbitMq server kurulumunuzu yaptıktan sonra bundle kuruyoruz.
Kurulum
Composer ile kurun:

$ composer require php-amqplib/rabbitmq-bundle

Bunldle Kayıt edin:
// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

Kurulum bitti şimdi ayarlarını yapmakta.

Composer da kalması gereken: (Lütfen silmeyin)

{
    "require": {
        "php-amqplib/rabbitmq-bundle": "~1.6",
    }
}

extension ve compiler ekleyin:

use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension;
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;

// ...

$containerBuilder->registerExtension(new OldSoundRabbitMqExtension());
$containerBuilder->addCompilerPass(new RegisterPartsPass());

Kullanım:

Confirasyon dosyanıza ekleyin:

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
            connection_timeout: 3
            read_write_timeout: 3

            # requires php-amqplib v2.4.1+ and PHP5.4+
            keepalive: false

            # requires php-amqplib v2.4.1+
            heartbeat: 0

            #requires php_sockets.dll
            use_socket: true # default false
        another:
            # A different (unused) connection defined by an URL. One can omit all parts,
            # except the scheme (amqp:). If both segment in the URL and a key value (see above)
            # are given the value from the URL takes precedence.
            # See https://www.rabbitmq.com/uri-spec.html on how to encode values.
            url: 'amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6'
    producers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            service_alias:    my_app_service # no alias by default
    consumers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            queue_options:    {name: 'upload-picture'}
            callback:         upload_picture_service

old_sound_rabbit_mq.upload_picture_producerve old_sound_rabbit_mq.upload_picture_consumer. sonra kullanılmak üzere beklemede olan servisler. bu servisleri upload_picture_service ile cagırabilirsiniz.

İsteğe Bağlı Kuyruk İşlemleri:

queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}

20 saniyelik TTL mesajı örnegi

queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}

Değişken Türleri:

    S – String
    I – Integer
    D – Decimal
    T – Timestamps
    F – Table
    A – Array

arguments isteğinize göre düzenleyebilirsiniz.

Belirli koşullara göre yaptırmak isterseniz:

queue_options:
    name: "upload-picture"
    routing_keys:
      - 'android.#.upload'
      - 'iphone.upload'

Önemli Uyarı:
2.3 üstü için geçerlidir bu yazılar eski versiyonlarda çalışmaz.

RabbitMQ server bağlantı ayarları:

connections:
    default:
        host:     'localhost'
        port:     5672
        user:     'guest'
        password: 'guest'
        vhost:    'foo' # to be dynamically overridden by <code>connection_parameters_provider</code>
        connection_parameters_provider: connection_parameters_provider_service

Örnek Uygulama:

class ConnectionParametersProviderService implements ConnectionParametersProvider {
    ...
    public function getConnectionParameters() {
        return array('vhost' => $this->getVhost());
    }
    ...
}

Producers, Consumers, Nedir?

Producers:
Gönderme sürecinde yapılan işlemlere denir.

Consumers:
Gönderilen verileri alıp işleyen birime denir.

Yüklenen resmi arkaplanda işlem yaptırmak istediğinizi varsayarsak:

public function indexAction($name)
{
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
}

Producers burada upload_picture

Servis burada old_sound_rabbit_mq.upload_picture_producer

setContentType ve setDeliveryMode yöntemlerini kullanabilirsiniz.

$this->get('old_sound_rabbit_mq.upload_picture_producer')->setContentType('application/json');

OldSound\RabbitMqBundle\RabbitMq\Producer dan miras alarak aşagıdaki classı kullanabilirsiniz.

 ...
    producers:
        upload_picture:
            class: My\Custom\Producer
            connection: default
            exchange_options: {name: 'upload-picture', type: direct}
    ...

Bundan sonraki işlemlde artık Consumers kısmına geçtik.
Burada iletilere göre işlemlere başlayacaktır.

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service

Görüldügü gibi callback bölümü mevcuttur işlemler bittiginde callback ile dönüş yapacaktır.

customer çalıştıgına emin olabilmeniz için aşagıdaki kodu çalıştırabilirsiniz:

$ ./app/console rabbitmq:consumer -m 50 upload_picture
$ ./app/console rabbitmq:consumer -w upload_picture

Bu işlemleri yapabilmeniz için PCNTL kütüphanesine ihtiyacınız vardır.

256 MB bellek sınırı vermek için:

$ ./app/console rabbitmq: consumer -l 256

Sıraya alınan bütün iletileri simek için

$ ./app/console rabbitmq: tasfiye - no-confirmation upload_picture

Tüketici kuyrugunu silmek için:

$ ./app/console rabbitmq: delete --no-onayı upload_picture

Yazar: admin