RabbitMQ, AMQP

2023. 6. 7. 15:57Cloud Native

    목차
반응형

from rabbitmq.com

AMQP

Advanced Message Queing Protocol
Message Queue를 통해 데이터를 주고받는 프로토콜을 정의합니다.

MQ들에 대한 표준의 부재로 인한 conversion의 overhead 등의 문제를 보완하고자 AMQP가 등장했습니다.
표준화 된 동작방식을 따라야 하기에 AMQP는 다음의 조건을 충족해야 합니다.

  • 모든 broker들이 동일 방식으로 동작
  • 모든 client들이 동일 방식으로 동작
  • 명령어의 표준화
  • 언어 중립적

AMQP의 구성 요소

  • Exchange
    • publisher가 송신한 메시지를 수신하여 적절한 queue 혹은 다른 exchange로 분배하는 Router 역할을 수행합니다.
    • Exchange는 다른 Excahnge 혹은 Queue와 binding 되어 있습니다.
    • binding 과 message의 matching은 exchange type으로 수행합니다. 이는 routing algorithm의 class 입니다.
    • broker는 여러 exchange type 인스턴스를 가질 수 있습니다.
      • binding: 어떤 message를 어떤 queue에 보낼지에 대한 것
      • exchange type: message를 어떤 방법으로 routing 할지에 대한 것
      • 즉, exchange는 어떤 type 을 어떤 방식으로 route 할지, binding은 route 된 것을 어느 queue, exchange로 보낼지 결정
  • Queue
    • binding을 통해 exchange에 연결
  • Binding
    • message 연결에 대한 일종의 routing table
    • 하나의 queueu가 여러 exchange에 bind 될 수 있음 (반대도 가능)
  • Routing key
    • message header에 위치한 가상 주소
    • exchange는 routing key로 어떤 queue로 message를 route 할지 결정
    • AMQP의 표준 exchange type은 routing key를 이용하도록 되어 있음

Standard Exchange Type

 

  • 대부분의 MQ에서 가능한 여러 사오항에 대해 AMQP에 정의한 표준 라우팅 알고리즘
  • Direct Exchange 
    • Message에 routing key가 있음
    • 이 key로 여러 queue에 matching
      • routing key와 동일 binding 이름을 갖는 queue(들)로 message를 전달
  • Topic exchange
    • wild card로 queue를 matching
      • *은 1개 이상, #은 0개 이상 발생 시 matching

  • Fanout exchange
    • message를 모든 queue로 broadcsting
  • Headers exchange
    • key, value로 정의된 값을 통해 routing을 결정
    • message는 여러 key, value를 지닐 수 있으며 각 k-v를 AND, OR 조건으로 결정하여 연결

Golang에서 RabbitMQ 사용

  • Producer가 보낸 메시지를
  • Exchange가 어떤 Exchange 혹은 Queue로 routing할지 결정해 전달
  • Consumer가 queue에서 메시지를 수신

설치

ampq 설치

go get github.com/rabbitmq/ampq091-go

test code

producer

package main

import (
    "log"
    amqp "github.com/rabbitmq/ampq091-go"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    defer conn.Close()

    ch, err := conn.Channel()
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "queue1",
        false,        // durable
        false,         // delete when unused
        false,         // exclusive
        false,        // no-wait
        nil,        // arguments
    )

    content := "test message"

    err = ch.Publish(
        "",            // exchange
        q.Name,        
        false,        // mandatory
        false,        // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(content),
        }
    )    
}
    1. conn = amqp.Dial
    1. ch = conn.Channel()

    1. ch.QueueDeclare("queue_name", ...)

    1. ch.Publish("", q.Name, ..., amqp.Publishing{ ContentType: "text/plain", Body: []byte("test message")})

consumer

package main

import (
    "log"
    amqp "github.com/rabbitmq/ampq091-go"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")    // message queue URL
    defer conn.Close()

    ch, err := conn.Channel()
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "queue1",
        false,        // durable
        false,         // delete when unused
        false,         // exclusive
        false,        // no-wait
        nil,        // arguments
    )

    content := "test message"

    msgs, err := ch.Consume(
        q.Name,        
        "",         // consumer
        true,        // auto-ACK
        false,        // exclusive
        false,        // no-local
        false,        // no-wait
        nil,        // args
    )

    var waitChannel chan interface{}

    go func() {
        for msg := range msgs {
            log.Printf("msg = %s", msg.body)
        }
    }

    <-waitChannel
}
  • Dial
    • TCP로 연결(session)을 만들어 연결
    • MQ가 위치하는 RabbitMQ server의 URL을 기입
      • MQ Cluster의 URL 지정

데이터 송/수신 방법

 

 

all the pictures in the article are from https://www.rabbitmq.com/tutorials/amqp-concepts.html

반응형