RabbitMQ, AMQP
2023. 6. 7. 15:57ㆍCloud Native
- 목차
반응형
AMQP
Advanced Message Queing Protocol
Message Queue를 통해 데이터를 주고받는 프로토콜을 정의합니다.
MQ들에 대한 표준의 부재로 인한 conversion의 overhead 등의 문제를 보완하고자 AMQP가 등장했습니다.
표준화 된 동작방식을 따라야 하기에 AMQP는 다음의 조건을 충족해야 합니다.
- 모든 broker들이 동일 방식으로 동작
- 모든 client들이 동일 방식으로 동작
- 명령어의 표준화
- 언어 중립적
AMQP의 구성 요소
- Exchange
- publisher가 송신한 메시지를 수신하여 적절한 queue 혹은 다른 exchange로 분배하는 Router 역할을 수행합니다.
- Exchange는 다른 Excahnge 혹은 Queue와 binding 되어 있습니다.
- binding 과 message의 matching은 exchange type으로 수행합니다. 이는 routing algorithm의 class 입니다.
- broker는 여러 exchange type 인스턴스를 가질 수 있습니다.
- binding: 어떤 message를 어떤 queue에 보낼지에 대한 것
- exchange type: message를 어떤 방법으로 routing 할지에 대한 것
- 즉, exchange는 어떤 type 을 어떤 방식으로 route 할지, binding은 route 된 것을 어느 queue, exchange로 보낼지 결정
- Queue
- binding을 통해 exchange에 연결
- Binding
- message 연결에 대한 일종의 routing table
- 하나의 queueu가 여러 exchange에 bind 될 수 있음 (반대도 가능)
- Routing key
- message header에 위치한 가상 주소
- exchange는 routing key로 어떤 queue로 message를 route 할지 결정
- AMQP의 표준 exchange type은 routing key를 이용하도록 되어 있음
Standard Exchange Type
- 대부분의 MQ에서 가능한 여러 사오항에 대해 AMQP에 정의한 표준 라우팅 알고리즘
- Direct Exchange
- Message에 routing key가 있음
- 이 key로 여러 queue에 matching
- routing key와 동일 binding 이름을 갖는 queue(들)로 message를 전달
- Topic exchange
- wild card로 queue를 matching
- *은 1개 이상, #은 0개 이상 발생 시 matching
- wild card로 queue를 matching
- Fanout exchange
- message를 모든 queue로 broadcsting
- Headers exchange
- key, value로 정의된 값을 통해 routing을 결정
- message는 여러 key, value를 지닐 수 있으며 각 k-v를 AND, OR 조건으로 결정하여 연결
Golang에서 RabbitMQ 사용
- Producer가 보낸 메시지를
- Exchange가 어떤 Exchange 혹은 Queue로 routing할지 결정해 전달
- Consumer가 queue에서 메시지를 수신
설치
ampq 설치
go get github.com/rabbitmq/ampq091-go
test code
producer
package main
import (
"log"
amqp "github.com/rabbitmq/ampq091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
ch, err := conn.Channel()
defer ch.Close()
q, err := ch.QueueDeclare(
"queue1",
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
content := "test message"
err = ch.Publish(
"", // exchange
q.Name,
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(content),
}
)
}
-
- conn = amqp.Dial
-
- ch = conn.Channel()
-
- ch.QueueDeclare("queue_name", ...)
-
- ch.Publish("", q.Name, ..., amqp.Publishing{ ContentType: "text/plain", Body: []byte("test message")})
consumer
package main
import (
"log"
amqp "github.com/rabbitmq/ampq091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") // message queue URL
defer conn.Close()
ch, err := conn.Channel()
defer ch.Close()
q, err := ch.QueueDeclare(
"queue1",
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
content := "test message"
msgs, err := ch.Consume(
q.Name,
"", // consumer
true, // auto-ACK
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
var waitChannel chan interface{}
go func() {
for msg := range msgs {
log.Printf("msg = %s", msg.body)
}
}
<-waitChannel
}
- Dial
- TCP로 연결(session)을 만들어 연결
- MQ가 위치하는 RabbitMQ server의 URL을 기입
- MQ Cluster의 URL 지정
데이터 송/수신 방법
all the pictures in the article are from https://www.rabbitmq.com/tutorials/amqp-concepts.html
반응형
'Cloud Native' 카테고리의 다른 글
Jaeger (예거) (0) | 2023.06.26 |
---|---|
Docker image의 deploy를 위한 특정 library에 대한 의존성 관리 (0) | 2023.06.20 |
CSR (Certificate Signing Request), SSC (Self Signed Certificate) (0) | 2023.01.19 |
카나리 (Canary) 배포 (Release) (0) | 2023.01.19 |
쿠버네티스 (Kubenetes, K8S) (0) | 2023.01.19 |