rabbitmq tutotial - work queue
들어가며..
이전의 Hello World 예제에서는 큐하나에 하나의 컨슈머가 있고 단순히 큐에 보낸 메시지를 컨슈머 하나가 작업하는 단순한 예제를 했었습니다.
이번에는 큐하나에 여러개의 컨슈머를 배치해서 작업을 병렬적으로 해결하는 법을 알아 봅시다.
하나의 worker프로세스는 백그라운드에서 동작 할 것이며 작업을 큐에서 하나 꺼내와 처리할 것 입니다.
만약 여러개의 worker를 사용한다면 큐안의 작업들을 공유하여 해결할 것 입니다.
이러한 개념은 특히 짧은 HTTP요청으로 복잡한 작업을 해결하기 불가능한 웹 어플리케이션에 유용합니다.
준비
이전 예제에서는 단순히 "Hello World"라는 문자열을 보냈지만 우리는 무엇인가 복잡한 작업을 의미하는 메세지를 보내고 싶습니다. 단순히 문자열에 '.'(dot)을 붙여 시간이 걸리는 작업이라는 것을 표현 합시다. 만약 문자열을 한문자씩 보다가 점을 만나면 Tread.sleep(1000)을 실행해 1초를 의미없이 보내는 것 입니다.
예를들어 "Hello..." 라는 문자열은 3초가 걸리는 것이죠
소스코드
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("username");
factory.setPassword("password");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//channel.basicQos(1);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received '"+message+"'");
try {
doWork(message);
}finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false );
}
}
};
boolean autoAck=true;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,consumerTag -> {});
}
private static void doWork(String task) {
for(char ch : task.toCharArray()) {
if(ch == '.') {
try {
Thread.sleep(1000);
}catch(InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("username");
factory.setPassword("password");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
String message = "[0]message.........";
channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN
,message.getBytes("UTF-8"));
System.out.println(" [x] Sent '"+message+"'");
}
}
}
라운드 로빈 디스패칭
태스크 큐를 사용하는 장점 중 하나는 쉽게 병렬적인 작업을 할 수 있다는 것 입니다.
먼저, 두개의 워커를 실행 합시다. 그들은 큐로부터 메시지들을 가져올 것 입니다.
결과를 보시면 작업 속도에 상관없이 worker1,worker2를 번갈아 가며 실행 되는 것을 보실 수 있습니다.
기본적으로 RabbitMQ는 각 메세지를 다음 컨슈머에게 전달합니다(순차적으로).
평균적으로 모든 컨슈머가 같은 개수의 메세지를 받게 됩니다.
이러한 방식을 라운드 로빈이라고 합니다.
Message acknowledgement
만약 컨슈머가 시간이 오래걸리는 작업을 시작한 후에 다 완료하기도전에 작업을 종료하면 어떨게 될까요?
현재 우리의 코드 상으로는 일단 RabbitMQ가 컨슈머로 메세지를 전달하면 이것을 즉시 삭제를 위한 표시를 합니다.
이 케이스 상으로는 만약 워커를 종료한다면 메세지의 작업은 사라지게 됩니다.
그러나 우리는 어떠한 태스크도 잃고 싶지않습니다. 만약 워커가 종료되었을때 우리는 작업을 다른 워커로 전달하고 싶습니다.
메세지를 절대 잃어버리지 않게 하기 위해서 RabbitMQ는 Message acknowlegements를 지원합니다.
특정 메세지가 받아졌고 처리되었다는 것을 RabbitMQ에 알려주기위해 ACK가 컨슈머에 의해 전송됩니다.
그리고 RabbitMQ는 해당 메세지를 삭제할 수 있습니다.
만약 컨슈머가 ACK를 전송하지 않고 죽는다면 RabbitMQ입장에서는 그 메세지가 완전히 처리되지 않았다고 생각하여 다시 큐에 넣어 질 것 입니다.
만약 다른 컨슈머들이 실행되어 있다면 그 메세지는 다른 컨슈머로 빠르게 다시 전달 될 것입니다.
이러한 방식은 워커가 가끔씩 죽더라도 메세지의 손실은 없다는 걸 확신할 수 있습니다.
타임아웃(기본적으로 30분)은 컨슈머가 ack를 전달하는데 있어서 강제됩니다.
이것은 buggy consumer를 찾아내는데 도움을 줍니다.
이전 예제 에서 autoAck를 false로 바꾸면 메세지가 사라지지 않는 것을 보장합니다.
Message durability
이전엔 워커가 죽었을때 메세지를 어떻게 안 잃어버릴까 였는데 이번엔 RabbitMQ의 서버가 죽어도 메세지를 잃지 않게 하는 방법을 알아 봅시다.
boolean durable = true;
channel.queueDeclare("hello",durable,false,false,null);
큐의 durable 파라미터를 true로 합니다.
이미 hello큐가 있기때문에 제대로 동작 하지 않을 수 있으니 큐의 이름을 "task_queue"로 바꾸는 법도 방법입니다.
물론 큐의 이름을 바꾼다면 worker파일의 큐 이름도 바꿔줘야 합니다.
참조:https://www.rabbitmq.com/tutorials/tutorial-two-java.html