Cloud Native

RabbitMQ, AMQP

Roiei 2023. 6. 7. 15:57
반응형

from rabbitmq.com

AMQP

Advanced Message Queing Protocol
Message Queue를 통해 데이터를 주고받는 프로토콜을 정의합니다.

MQ들에 대한 표준의 부재로 인한 conversion의 overhead 등의 문제를 보완하고자 AMQP가 등장했습니다.
표준화 된 동작방식을 따라야 하기에 AMQP는 다음의 조건을 충족해야 합니다.

  • 모든 broker들이 동일 방식으로 동작
  • 모든 client들이 동일 방식으로 동작
  • 명령어의 표준화
  • 언어 중립적

AMQP의 구성 요소

  • Exchange
    • publisher가 송신한 메시지를 수신하여 적절한 queue 혹은 다른 exchange로 분배하는 Router 역할을 수행합니다.
    • Exchange는 다른 Excahnge 혹은 Queue와 binding 되어 있습니다.
    • binding 과 message의 matching은 exchange type으로 수행합니다. 이는 routing algorithm의 class 입니다.
    • broker는 여러 exchange type 인스턴스를 가질 수 있습니다.
      • binding: 어떤 message를 어떤 queue에 보낼지에 대한 것
      • exchange type: message를 어떤 방법으로 routing 할지에 대한 것
      • 즉, exchange는 어떤 type 을 어떤 방식으로 route 할지, binding은 route 된 것을 어느 queue, exchange로 보낼지 결정
  • Queue
    • binding을 통해 exchange에 연결
  • Binding
    • message 연결에 대한 일종의 routing table
    • 하나의 queueu가 여러 exchange에 bind 될 수 있음 (반대도 가능)
  • Routing key
    • message header에 위치한 가상 주소
    • exchange는 routing key로 어떤 queue로 message를 route 할지 결정
    • AMQP의 표준 exchange type은 routing key를 이용하도록 되어 있음

Standard Exchange Type

 

  • 대부분의 MQ에서 가능한 여러 사오항에 대해 AMQP에 정의한 표준 라우팅 알고리즘
  • Direct Exchange 
    • Message에 routing key가 있음
    • 이 key로 여러 queue에 matching
      • routing key와 동일 binding 이름을 갖는 queue(들)로 message를 전달
  • Topic exchange
    • wild card로 queue를 matching
      • *은 1개 이상, #은 0개 이상 발생 시 matching

  • Fanout exchange
    • message를 모든 queue로 broadcsting
  • Headers exchange
    • key, value로 정의된 값을 통해 routing을 결정
    • message는 여러 key, value를 지닐 수 있으며 각 k-v를 AND, OR 조건으로 결정하여 연결

Golang에서 RabbitMQ 사용

  • Producer가 보낸 메시지를
  • Exchange가 어떤 Exchange 혹은 Queue로 routing할지 결정해 전달
  • Consumer가 queue에서 메시지를 수신

설치

ampq 설치

go get github.com/rabbitmq/ampq091-go

test code

producer

package main

import (
    "log"
    amqp "github.com/rabbitmq/ampq091-go"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    defer conn.Close()

    ch, err := conn.Channel()
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "queue1",
        false,        // durable
        false,         // delete when unused
        false,         // exclusive
        false,        // no-wait
        nil,        // arguments
    )

    content := "test message"

    err = ch.Publish(
        "",            // exchange
        q.Name,        
        false,        // mandatory
        false,        // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(content),
        }
    )    
}
    1. conn = amqp.Dial
    1. ch = conn.Channel()

    1. ch.QueueDeclare("queue_name", ...)

    1. ch.Publish("", q.Name, ..., amqp.Publishing{ ContentType: "text/plain", Body: []byte("test message")})

consumer

package main

import (
    "log"
    amqp "github.com/rabbitmq/ampq091-go"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")    // message queue URL
    defer conn.Close()

    ch, err := conn.Channel()
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "queue1",
        false,        // durable
        false,         // delete when unused
        false,         // exclusive
        false,        // no-wait
        nil,        // arguments
    )

    content := "test message"

    msgs, err := ch.Consume(
        q.Name,        
        "",         // consumer
        true,        // auto-ACK
        false,        // exclusive
        false,        // no-local
        false,        // no-wait
        nil,        // args
    )

    var waitChannel chan interface{}

    go func() {
        for msg := range msgs {
            log.Printf("msg = %s", msg.body)
        }
    }

    <-waitChannel
}
  • Dial
    • TCP로 연결(session)을 만들어 연결
    • MQ가 위치하는 RabbitMQ server의 URL을 기입
      • MQ Cluster의 URL 지정

데이터 송/수신 방법

 

 

all the pictures in the article are from https://www.rabbitmq.com/tutorials/amqp-concepts.html

반응형