Dev/ETC

[MQ] RabbitMQ Publish Subscribe 예제

헝그리둘기 2022. 12. 2. 15:23

RabbitMQ설치서버(로컬PC) 환경: Windows10, RabbitMQ 3.9.13, jdk1.7.0_80

예제 패키지 환경: SpringBoot 2.6.3, jdk1.8

 

프로젝트에 메시지를 목적지까지 전달해주는 메시지 브로커(집배원)를 적용해야 할 일이 생겨 RabbitMQ를 선택했고 학습을 위해 간단한 Publish / Subscribe 구조의 예제를 만들어 보았다. https://www.rabbitmq.com/ 공식 홈페이지에 설치방법은 물론 프로그래밍 언어별로 튜토리얼이 잘 나와있다.

예제 코드를 보기 전에 큐에 메시지를 담고 꺼내는 과정을 간단히 설명하자면,

1. publisher가 메시지를 어떤 큐로 보낼지 결정하는 라우터의 역할을 하는 exchange에게 메시지 전송

2. exchange에서 알맞은 queue로 메시지 적재

3. queue를 구독하고있던 consumer가 메시지를 꺼내봄

정도가 되겠다.

 

1. PC에 RabbitMQ 설치 (erlang). 설치후 실행까지 잘 된다면 5672포트가 열려있을 것이고, 브라우저에서 http://localhost:15672/로 관리자 인터페이스 접근 가능 (디폴트 계정은 guest / guest)

2. 예제 프로젝트에서 rabbitmq 라이브러리 추가

3. MQ서버에 연결해 메시지를 보내고 받는 MQTest클래스와, 실행클래스(Main)를 생성해 테스트. 아래 예제 참고

@Slf4j
public class MQTest {
    private Channel channel = null;

    /**
     * MQ 서버연결, 채널생성
     */
    public void connectChannel() throws Exception {
        String mqUri = "amqp://계정:비밀번호@localhost:5672";  // MQ가 설치되어 실행중인 서버

        ConnectionFactory factory = new ConnectionFactory();  // 팩토리 객체 생성
        factory.setUri(mqUri);  // MQ 서버와 연결
        Connection connection = factory.newConnection();  // 커넥션 객체 생성

        this.channel = connection.createChannel();
    }

    /**
     * 메시지 적재 (Publish)
     *
     * @param requestDataMap 요청 메시지 맵
     */
    public void publish(Map<String, Object> requestDataMap) throws Exception {
        // 채널 세팅
        String exchangeName = "exchange.test.direct";  // exchange 이름
        String exchangeType = "direct";  // exchange 타입 (fanout:바인딩된 모든 큐로 전송, direct:라우팅키를 통해 특정 큐로만 전송)
        String requestQueue = "queue.test";  // 요청 메시지가 적재될 큐 이름
        String routingKey = "01";  // exchange에 들어온 메시지를 어느 queue에 적재할지 판단하는 라우팅 키값
        boolean isDurable = true;  // mq서버 재시작해도 제거 안할건지
        boolean isExclusive = false;  // 커넥션 종료되면 제거 할건지
        boolean isAutoDelete = false;  // 큐에 붙어있는 컨슈머가 한개도 없으면 제거 할건지

        this.channel.exchangeDeclare(exchangeName, exchangeType, isDurable);  // exchange 지정 (없으면 생성)
        this.channel.queueDeclare(requestQueue, isDurable, isExclusive, isAutoDelete, null).getQueue();  // queue 지정 (없으면 생성)
        this.channel.queueBind(requestQueue, exchangeName, routingKey);  // exchange <-> queue 바인딩

        // 적재 속성 객체 빌드
        AMQP.BasicProperties requestProps = new AMQP.BasicProperties
                .Builder()
                .contentType("application/json")  // 메시지 타입 json
                .deliveryMode(2)  // RabbitMQ 서버가 재실행되도 큐에 적재된 메시지 유지
                .build();

        // 메시지 세팅
        ObjectMapper objectMapper = new ObjectMapper();  // json <-> java object 변환 객체
        String requestMessage = objectMapper.writeValueAsString(requestDataMap);  // 메시지를 json형태의 string으로 변환

        this.channel.basicPublish(exchangeName, routingKey, requestProps, requestMessage.getBytes(StandardCharsets.UTF_8));  // 적재
        log.debug("[X] {} [RK] {} <- '{}' 메시지를 보냈습니다", exchangeName, routingKey, requestMessage);
    }

    /**
     * 메시지 소비 (Consume)
     */
    public void consumeMacroReplyQueue() throws Exception {
        String queueToConsume = "queue.test";  // 소비할 큐 이름
        this.channel.basicQos(1);  // 한 컨슈머에게 메시지를 한개씩만 전달한다
        boolean autoAck = false;  // 소비 시 바로 메시지 제거할지

        // 소비 시 실행될 콜백함수
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 큐에대한 정보 확인
            AMQP.Queue.DeclareOk queue = this.channel.queueDeclarePassive(queueToConsume);
            log.info("[Q] {} - 적재된 메시지 총 {} 개", queue.getQueue(), queue.getMessageCount() + 1);

            // 요청받은 메시지
            String requestMessage = new String(delivery.getBody(), StandardCharsets.UTF_8);
            log.info("[Q] {} -> '{}' 메시지를 받았습니다", queueToConsume, requestMessage);

            // 실패, 성공에 관계없이 소비에 대한 ack를 보내 다음 메시지를 받음
            this.channel.basicAck(delivery.getEnvelope().getDeliveryTag(), autoAck);
        };

        // 소비
        this.channel.basicConsume(queueToConsume, autoAck, deliverCallback, consumerTag -> {
        });
    }
}
public class MQTestMain {
    public static void main(String[] args) {
        try {
            MQTest mqTest = new MQTest();
            mqTest.connectChannel();
            
            // 요청 메시지 세팅
            Map<String, Object> requestHashMap = new HashMap<>();
            requestHashMap.put("data01", 123);
            requestHashMap.put("data02", "test123");
            
            mqTest.publish(requestHashMap);  // 적재
            mqTest.consumeMacroReplyQueue();  // 소비
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

콘솔에 찍힌 main메서드의 결과값 

[2022-12-02 15:14:41.563] DEBUG MQTest:63 - [X] exchange.test.direct [RK] 01 <- '{"data01":123,"data02":"test123"}' 메시지를 보냈습니다
[2022-12-02 15:14:41.572] INFO  MQTest:78 - [Q] queue.test - 적재된 메시지 총 1 개
[2022-12-02 15:14:41.572] INFO  MQTest:82 - [Q] queue.test -> '{"data01":123,"data02":"test123"}' 메시지를 받았습니다