지난 글들에서 MQTT에 대해서 이해를 하는 시간을 가졌다.
이번에는 MQTT를 그럼... 뭐를 써야 하는지 키메라가 정리하는 시간을 가지려고 한다.
MQTT에 대해서는 HiveMQ에서 제공하는 공식문서를 좀 읽어봤던 이력이 있다.
글을 정리하면서 HiveMQ의 공식 문서를 통해서 MQTT에 대한 필수 지식을 들여다 봣지만, 사실 생각보다 별로였는데 좀 더 큰 맥락에서 이를 정리하고 Packet구조에 대해 한눈에 들어오게 정리한 글이 있어서 이를 읽어보길 추천하고 글을 시작하려고 한다.
부끄러운 이야기이긴 하지만 그냥 이 글 하나 읽어보는데... 더 좋다는 판단도 든다. 이전에 본 HiveMQ의 정리글들은 너무 MQTT명세서의 흐름을 따라가서, 큰 맥락에서 머릿속에 그림이 그려지지 않았다.
마지막으로 학습중인 코드는 해당 git 주소에 올렸으니 같이 보면 좋다 (https://github.com/thelovemsg/netty_basecamp)
목표
1. 어떤 MQTT 활용할지 고민한다.
2. 예시 코드의 진행사항을 이해하고, 현재 어떻게 테스트하는지 살펴본다.
0. MQTT Essentials 돌아보기
[Messaging] MQTT이해하기 - 1탄 : MQTT란 무엇인가? + mqtt introduction
[Messaging] MQTT이해하기 - 2탄 : Pub/Sub 구조
[Messaging] MQTT이해하기 - 3탄 : Broker와 Server Connection + Subscribe & Unsubscribe
[Messaging] MQTT이해하기 - 4탄 : Wildcards & Best Practices + QoS
이전 글에서 필자가 판단하기에 여기까지 보면 된다 싶은 내용을 기존에 정리해놨다.
사실 다시 읽기는 재미없다. 빨리 필자는 사용을 하고 싶은 입장이기에 해당 글들의 핵심을 추려서 요약해달라고 Gemini에게 부탁했다. 궁금하면 열어서 읽어보길 바란다.
1. MQTT란 무엇인가? (Introduction)
MQTT(Message Queuing Telemetry Transport)는 제한된 대역폭과 불안정한 네트워크 환경에서 동작하는 IoT(사물인터넷) 디바이스를 위해 설계된 초경량 메시징 프로토콜입니다.
- 동작 원리: 기본적으로 TCP/IP 프로토콜 위에서 동작하지만, HTTP와 달리 헤더 크기가 최소 2 Byte로 매우 작아 네트워크 오버헤드와 배터리 소모를 극단적으로 줄입니다.
- 특징: 클라이언트 간의 직접적인 연결 없이 중앙의 Broker를 거치는 구조를 가지며, 네트워크가 끊겼을 때를 대비한 세션 유지 및 유언 메시지(LWT, Last Will and Testament) 기능을 제공하여 분산 환경에서의 신뢰성을 확보합니다.
2. Pub/Sub 구조 (Publish/Subscribe Architecture)
MQTT의 가장 큰 아키텍처적 특징은 발행-구독(Pub/Sub) 모델을 통한 결합도(Coupling) 최소화입니다.
- 공간적 분리: 메시지를 발행하는 클라이언트(Publisher)와 수신하는 클라이언트(Subscriber)는 서로의 IP나 존재 여부를 알 필요가 없습니다. 오직 Broker의 주소와 메시지의 주제(Topic)만 알면 됩니다.
- 시간적 분리: 두 클라이언트가 동시에 네트워크에 연결되어 있을 필요가 없습니다. (Broker가 세션과 메시지를 보관할 수 있음)
- 동기화 분리: 메시지 발행 및 수신 작업이 비동기적으로 처리되므로, 클라이언트의 메인 스레드가 블로킹되지 않습니다.
3. Broker와 Connection + Subscribe & Unsubscribe
대용량 트래픽 처리 관점에서 Broker는 수많은 클라이언트의 커넥션을 유지하고 메시지를 라우팅하는 핵심 서버 역할을 합니다.
- Connection: 클라이언트가 Broker에게 CONNECT 패킷을 보내고, Broker가 CONNACK 패킷으로 응답하여 TCP 연결을 맺습니다. 이때 Client ID, Clean Session(이전 세션 정보 유지 여부), Keep Alive(연결 상태 확인 주기) 파라미터가 교환됩니다.
- Subscribe / Unsubscribe:
- 연결이 완료된 클라이언트는 관심 있는 Topic을 수신하기 위해 SUBSCRIBE 패킷을 전송하고 SUBACK을 받습니다.
- 구독을 해제할 때는 UNSUBSCRIBE 패킷을 전송하여 더 이상 해당 Topic의 메시지를 수신하지 않도록 Broker의 라우팅 테이블에서 자신을 제거합니다.
4. Wildcards & Best Practices + QoS
효율적인 메시지 라우팅과 신뢰성 보장을 위한 MQTT의 핵심 기능입니다.
Topic Wildcards (토픽 와일드카드) 토픽은 계층적 구조(/)를 가지며, 여러 토픽을 한 번에 구독하기 위해 와일드카드를 사용합니다.
- 단일 레벨(+): 특정 하나의 레벨만 매칭합니다. (예: sensor/+/temperature -> sensor/room1/temperature 매칭, sensor/room1/humidity 실패)
- 다중 레벨(#): 해당 레벨 하위의 모든 계층을 매칭합니다. 반드시 토픽의 마지막에만 사용 가능합니다. (예: sensor/room1/# -> room1 하위의 모든 센서 데이터 매칭)
Best Practices (설계 최우수 사례)
- 최상위 레벨에 와일드카드(#) 사용 금지: Broker에 엄청난 부하를 유발합니다.
- 토픽 이름의 맨 앞이나 뒤에 슬래시(/)를 붙이지 않습니다. (불필요한 공백 레벨 생성 방지)
- 토픽에 공백 문자를 사용하지 않고, ASCII 문자만 사용하여 명확하게 구성합니다.
- 클라이언트 ID를 토픽 트리에 포함시켜 권한 제어와 디버깅을 용이하게 합니다.
QoS (Quality of Service) MQTT는 네트워크 환경에 따라 메시지 전송의 신뢰성을 3단계로 보장합니다.
- QoS 0 (At most once): "Fire and Forget". 메시지를 한 번만 전송하며, 수신 확인을 하지 않습니다. 유실 가능성이 있지만 성능이 가장 빠릅니다.
- QoS 1 (At least once): 메시지가 최소 한 번은 전달됨을 보장합니다. 수신자로부터 PUBACK을 받을 때까지 재전송하므로 중복 수신이 발생할 수 있습니다. 수신 측의 멱등성(Idempotency) 처리가 중요합니다.
- QoS 2 (Exactly once): 메시지가 정확히 한 번만 전달됨을 보장합니다. 4단계 핸드셰이크(PUBREC, PUBREL, PUBCOMP)를 거치므로 오버헤드가 가장 크지만, 과금이나 제어 명령 등 절대 중복되거나 누락되면 안 되는 시스템에 사용됩니다.
사실 컨셉이나 느낌 혹은 vibe라고 해야하나? 그런거는 대~강 알겠다.
그래 그래! 좋고 이런 설정이 있는걸 알겠어. 현재는 이것을 어떻게 활용할 것인지를 고민하고 있다.
다양한 MQTT 브로커도 있고, 버전마다 또 지원하는게 다르다.
1. 목표와 현재 상황 정리
현재 필자가 구현하고자 하는 상황은 다음과 같다.
Netty와 MQTT를 활용한 저지연 차량 모니터링 시스템 구축
- 차량/로봇에서 오는 대량 실시간 데이터 수집 및 처리
- 원격 모니터링/제어를 위한 저지연 통신 서버
- 지도, 센서, 로그 등 대용량 데이터 파이프라인
- 시뮬레이션/테스트 인프라 관리
코드들은 해당 주소(https://github.com/thelovemsg/netty_basecamp)에서 꾸준히 업데이트할 것이다.
현 글을 적는 시점에서의 상황을 요약해서 Gemini에게 정리를 요청했다. 궁금하면 읽어보길 바란다.
Spring 없이 Netty로 차량 관제 시스템 만들기: 현재 구현 현황과 앞으로의 과제
MQTT, 실시간 데이터 처리, 저지연 비동기 서버. 이 세 가지를 한 번에 다룰 수 있는 도메인이 필요했고, '차량 관제'가 딱 맞았다.
GPS 데이터가 끊임없이 들어오고, 수천 대의 연결을 동시에 유지해야 하며, 지연이 생기면 바로 티가 나는 도메인이기 때문이다.
가장 핵심적인 실험 목표는 이것이다. "Spring 없이 순수 Netty로만 이 시스템을 돌리면 어떻게 될까?"
📍 현재까지 완성된 아키텍처와 구현 내용
전체 시스템 구조는 크게 세 덩어리로 나뉜다.
- 가상 차량이 GPS를 쏘는 시뮬레이터
- 메시지를 중계하는 MQTT 브로커
- 데이터를 받아서 저장하고 보여주는 백엔드 서버
지금까지는 시뮬레이터와 서버의 HTTP API, 그리고 프론트엔드 대시보드를 완성했다. 브로커와 서버를 연결하는 Subscriber 구현이 다음 단계다.
1. 순수 Netty 기반의 백엔드 서버 (Port: 8081) 서버는 8081 포트에서 독립적으로 실행된다. Spring의 DispatcherServlet 같은 마법이 없으니 라우팅, 예외 처리, JSON 직렬화, CORS 통제까지 전부 직접 구현했다.
- 스레드 모델 최적화: 도메인 로직은 Virtual Thread로 오프로드(Offload)하여 처리하고, 클라이언트에 응답을 쓸 때는 반드시 EventLoop 스레드로 돌려보내도록 설계했다. Netty의 I/O 연산은 EventLoop 스레드에서만 안전하기 때문이다. 개발 초기 이 원칙을 어겼다가 응답이 클라이언트에 전달되지 않는 현상을 직접 겪으며 체득한 중요한 구조적 포인트다.
2. 도메인 주도 설계 (DDD) 적용 도메인은 Vehicle, Journey, LocationSnapshot 세 개의 Aggregate Root로 설계했다. 여기서 핵심적인 결정은 위치 추적 로직을 tracking이라는 별도의 Bounded Context로 분리한 것이다.
- 처음엔 Vehicle 패키지 안에 Trip(운행)을 함께 두었다. 하지만 위치 추적은 차량에만 한정된 개념이 아니다. 킥보드든 드론이든 추적 대상이 달라져도 Journey와 LocationSnapshot은 그대로 재사용할 수 있어야 한다.
- 이를 위해 TrackingTarget이라는 VO(Value Object)를 만들어 targetId와 targetType의 조합으로 추적 대상을 완벽하게 추상화했다.
3. 시뮬레이터와 대시보드
- MQTT 시뮬레이터: 서울 시내의 랜덤 경로를 GPS 좌표로 생성하여 브로커로 Publish 한다. 클라이언트 라이브러리로 HiveMQ MQTT Client를 선택했다. Eclipse Paho가 Thread-per-Connection 모델인 것과 달리, HiveMQ Client는 내부가 Netty 기반으로 구현되어 있어 수천 대의 차량이 동시에 연결해도 소수의 EventLoop이 다중화(Multiplexing)하여 처리할 수 있기 때문이다.
- 대시보드: 정적 HTML 파일과 Leaflet.js + OpenStreetMap을 결합하여 서울 지도를 띄웠다. 차량을 선택하면 운행 이력 목록이, 운행을 선택하면 실제 GPS 경로가 지도 위에 실시간으로 그려진다.
💡 아키텍처 회고: 핵심 설계 변경 사항
개발을 진행하던 중, Trip과 관련된 HTTP API를 전부 삭제하는 과감한 결정을 내렸다.
기존에는 POST /trips로 운행 엔티티를 생성하고, POST /trips/{id}/depart API를 호출하여 차량을 출발시키는 구조였다.
하지만 이는 현실의 IoT 도메인과 완벽히 정반대의 흐름이다. 실제 환경에서 서버는 차량에게 "운행을 시작하세요"라고 명령하지 않는다. 차량이 스스로 GPS 데이터를 보내기 시작하면, 서버가 그 스트림을 감지하고 "이 차량이 운행을 시작했구나"라고 수동적으로 판단하는 것이 옳다.
서버가 운행을 직접 생성하는 구조는 시뮬레이터에서 잠깐 테스트할 때나 편리할 뿐, 올바른 관제 아키텍처가 아니다. 따라서 Subscriber가 해당 차량의 첫 GPS 페이로드를 수신하는 순간, 서버 내부에서 Journey를 자동 생성하도록 이벤트 흐름을 뒤집는 것이 아키텍처 관점에서 타당하다.
2. MQTT broker 선정

MQTT는 사실 프로토콜일 뿐이다. pub/sub 구조를 따르는 MQTT는 Broker와 Client 를 선정해야한다.
어느 것을 사용해야지 client에서 발행한(publish)한 메시지를 broker에서 받아서 이를 구독자(subscriber)들이 확인할 수 있을까?
우선 어떤 것을 Broker로 사용할지 찾아보자.
2.1) HiveMQ 찾아보기
HiveMQ에서는 이미 무료 Broker와 Client를 제공한다. (궁금하면 여기 클릭 - Free MQTT By HiveMQ)

그런데 start free를 클릭하니 로그인 화면으로 나를 안내한다.

그래서 우선 나오고... 다음을 봣다.

아니 자꾸 홈페이지에서는 Enterprise에 대해서만 설명을 해서 또 찾다보니 hivemq-community-edition도 들어다보게 되었다.
(하 귀찮게 진짜...)
HiveMQ Community Edition은 무료로 사용법도 볼 수 있으며 Docker로 바로 띄울 수 있다.

그리고 상세한 documentation은 해당 github의 wiki에 올려놨다.
(https://github.com/hivemq/hivemq-community-edition/wiki)
정말 친절하게 잘 설명이 되어있다.
HiveMQ Enterprise는 유료로 무슨 차이인가 사이트에서 찾아보았다.

Enterprise Grade란?
엔터프라이즈급이란 대규모 기업이나 조직의 엄격한 기준과 요구 사항을 충족하는 도구 또는 애플리케이션을 의미합니다.
이러한 도구는 기업 수준의 환경에서 요구되는 복잡한 사항들을 처리하도록 특별히 설계되었습니다.
현재 긍정적인 점은 필자는 단순히 학습하는 단계로 enterprise급의 성능까지는 필요가 없다.
HiveMQ의 공식문서를 참고해서 Broker사용법을 익히는게 좋을것 같다는 판단이 든다.
2.2) Mosquitto 찾아보기
다음의 글은 Eclipse Mosquitto 공식 사이트에 있는 글을 번역한 글이다. (mosquitto 공식사이트)
Eclipse Mosquitto는 MQTT 프로토콜 버전 5.0, 3.1.1 및 3.1을 구현하는 오픈 소스(EPL/EDL 라이선스) 메시지 브로커입니다. Mosquitto는 경량화되어 있어 저전력 싱글 보드 컴퓨터부터 풀 서버까지 모든 장치에서 사용하기에 적합합니다.
MQTT 프로토콜은 발행/구독 모델을 사용하여 메시지를 주고받는 경량 방식을 제공합니다. 따라서 저전력 센서나 휴대폰, 임베디드 컴퓨터, 마이크로컨트롤러와 같은 모바일 장치 등 사물 인터넷(IoT) 환경에서의 메시징에 적합합니다.
Mosquitto 프로젝트는 MQTT 클라이언트를 구현하기 위한 C 라이브러리와 매우 인기 있는 mosquitto_pub 및 mosquitto_sub 명령줄 MQTT 클라이언트도 제공합니다.
Mosquitto는 Eclipse Foundation의 일부이며, Cedalo가 개발을 주도하는 iot.eclipse.org 프로젝트입니다.
사실 의도치 않은 수확이 있었는데, MQTT broker에 대해서 타입별로 정리를 잘 해주어서 잘 읽고 간다. (꺼억~)

mosquitto의 공식사이트가 HiveMQ에 비해 훨씬 보기 좋게 글을 잘 적었다.
2.3) Apache Artemis 찾아보기
이건 또 뭐냐? Apache Artemis는 태생이 'MQTT 전용 브로커'는 아니지만 MQTT 3.1.1 및 5.0 사양을 완벽하게 지원하는 멀티 프로토콜 메시지 브로커라고 한다.
Apache Artemis™는 AMQP 1.0, MQTT 3.1.1, MQTT 5, STOMP 등 다양한 업계 표준 프로토콜을 지원하는 멀티 프로토콜 메시지 브로커입니다. 이를 통해 사용자는 Java, JavaScript, C, C++, Python, .Net 등 다양한 언어와 플랫폼으로 작성된 클라이언트를 사용하여 연결할 수 있습니다. Artemis는 사용자의 메시징 사용 사례를 지원하는 강력하고 유연한 기능을 제공합니다.
출처 : https://artemis.apache.org/
그리고 docker로 제공도 해주고 User Manual 도 제공하고 있다.


4. 선정한 MQTT Broker : HiveMQ
사실 다른 MQTT Broker도 많은데, 필자는 현재 학습을 하는 수준이기 때문에 되도록이면 유명한 것을 쓰는게 맞다고 판단했다.
enterprise grade도 서비스한다면 community 레벨도 충분히 학습하고 쓰는데 좋다고 판단했다.
또 현재는 내가 스스로 써보고 끝내는 수준이라서 cloud 레벨에서 지원하는 broker는 현재 필요없다고 판단도 했다.
마지막으로! HiveMQ는 Java-based open sourse MQTT broker다. 또한 HiveMQ는 Netty를 통해서 구현이 되어있고 기술 블로그에서 HiveMQ에 관해 상세히 설명하고 있다.
이보다 더 좋은 가이드와 경험을 전부 제공하는 좋은 open source가 있을까?
뭐... 현재 당장 무엇이라도 사용해서 써보는게 좋지 뭐가 나은지 찾아보는게 나을 것 같다는 판단!
(판단이라는 단어를 몇 번이나 쓴거야?)
5. MQTT 3.x vs MQTT 5.0
글을 읽으면서 MQTT 3.x 랑 MQTT 5.0을 지원한다는 말이 자주 나와서 이게 뭔지 찾아보았다.
사실 이것은 mqtt 공식 사이트에서 MQTT Specifications 에 나와 있는 내용이다.

간단하다. 버전에 따라서 그냥 MQTT 5, MQTT 3.1.1, MQTT 3.1 이렇게 부르는 것이다.
필자가 사용하고자 하는 HiveMQ의 경우에는 5.0 과 3.x버전을 전부 다 지원한다고 했다.
6. HiveMQ MQTT Client
HiveMQ MQTT Client는 java에서 쉽게 메시지를 publish 하고 subscribe하도록 해주는 library다.
7. 실제 코드 실행
현재 가상의 환경인 만큼 이것을 직접 실행했다.
뒤에서 볼 코드들은 전부 git에 올라가 있으니 참고하면 되는데, mqtt관련해서 중요한 부분에 대해 설명하고자 코드를 들여다보려고 한다.
코드는 단순히 MQTT 관련 코드만 있는게 아니라 netty 설정도 있으니 참고 바란다.
우선 도커를 띄운다.
docker-compose.yml
services:
hivemq:
image: hivemq/hivemq-ce
ports:
- "1883:1883"
도커는 알아서 띄워라!(박력) 설마 이걸못하겠어... 숙덕숙덕
그리고 구독과 발행을 쉽게 할 수 있도록 다음을 depencency 에 추가한다.
참고로 필자가 만드는 프로젝트는 gradle이다.
// MQTT 클라이언트 (HiveMQ)
implementation 'com.hivemq:hivemq-mqtt-client:1.3.3'
우선 큰 그림을 보고 접근하자.

잘 안보이지 않는가? 다시 이것을 확대해서 보도록 하겠다.

번호를 따라서 순차적으로 봐보자.
1번을 보면 서버가 시작되면서 브로커에 MQTT 클라이언트 2개를 연결한다.
Publisher 클라이언트 (cartracking-simulator)는 연결만 해둔 상태로 아직 메시지를 보내지는 않았다.
Subscriber 클라이언트 (cartracking-subscriber)는 연결과 동시에 vehicle/+/telemetry 토픽을 구독 시작한다.
publisher이건, subscriber이건 둘 다 client라는 점 잊지 말자!
실행 직후 로그
2026-04-23 22:21:58.608 DEBUG [main] - Using Log4J2 as the default logging framework
2026-04-23 22:21:58.611 DEBUG [main] - -Djava.net.preferIPv4Stack: false
2026-04-23 22:21:58.611 DEBUG [main] - -Djava.net.preferIPv6Addresses: false
2026-04-23 22:21:58.613 DEBUG [main] - Loopback interface: loopback_0 (Software Loopback Interface 1, 0:0:0:0:0:0:0:1)
2026-04-23 22:21:58.616 DEBUG [main] - Java version: 21
2026-04-23 22:21:58.616 DEBUG [main] - -Dio.netty.noUnsafe: false
2026-04-23 22:21:58.617 DEBUG [main] - sun.misc.Unsafe.theUnsafe: available
2026-04-23 22:21:58.618 DEBUG [main] - sun.misc.Unsafe base methods: all available
2026-04-23 22:21:58.618 DEBUG [main] - java.nio.Buffer.address: available
2026-04-23 22:21:58.618 DEBUG [main] - direct buffer constructor: unavailable: Reflective setAccessible(true) disabled
2026-04-23 22:21:58.619 DEBUG [main] - java.nio.Bits.unaligned: available, true
2026-04-23 22:21:58.619 DEBUG [main] - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable: symbolic reference class is not accessible: class jdk.internal.misc.Unsafe, from class io.netty.util.internal.PlatformDependent0 (unnamed module @67e2d983)
2026-04-23 22:21:58.620 DEBUG [main] - java.nio.DirectByteBuffer.<init>(long, {int,long}): unavailable
2026-04-23 22:21:58.620 DEBUG [main] - sun.misc.Unsafe: available
2026-04-23 22:21:58.620 DEBUG [main] - -Dio.netty.tmpdir: C:\Users\thelo\AppData\Local\Temp (java.io.tmpdir)
2026-04-23 22:21:58.620 DEBUG [main] - -Dio.netty.bitMode: 64 (sun.arch.data.model)
2026-04-23 22:21:58.620 DEBUG [main] - Platform: Windows
2026-04-23 22:21:58.620 DEBUG [main] - -Dio.netty.maxDirectMemory: -1 bytes
2026-04-23 22:21:58.621 DEBUG [main] - java.nio.ByteBuffer.cleaner(): available
2026-04-23 22:21:58.621 DEBUG [main] - -Dio.netty.noPreferDirect: false
2026-04-23 22:21:58.623 DEBUG [main] - -Dio.netty.jfr.enabled: true
2026-04-23 22:21:58.624 DEBUG [main] - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
2026-04-23 22:21:58.802 DEBUG [main] - Epoll support is not available: null
2026-04-23 22:21:58.813 DEBUG [main] - -Dio.netty.eventLoopThreads: 24
2026-04-23 22:21:58.815 DEBUG [main] - -Dio.netty.noKeySetOptimization: false
2026-04-23 22:21:58.815 DEBUG [main] - -Dio.netty.selectorAutoRebuildThreshold: 512
2026-04-23 22:21:58.820 DEBUG [main] - -Dio.netty.globalEventExecutor.quietPeriodSeconds: 1
2026-04-23 22:21:58.824 DEBUG [main] - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
2026-04-23 22:21:58.824 DEBUG [main] - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
2026-04-23 22:21:58.828 DEBUG [main] - org.jctools-core.MpscChunkedArrayQueue: available
2026-04-23 22:21:58.867 DEBUG [main] - -Dio.netty.leakDetection.level: simple
2026-04-23 22:21:58.867 DEBUG [main] - -Dio.netty.leakDetection.targetRecords: 4
2026-04-23 22:21:58.871 DEBUG [main] - -Dio.netty.buffer.checkAccessible: true
2026-04-23 22:21:58.871 DEBUG [main] - -Dio.netty.buffer.checkBounds: true
2026-04-23 22:21:58.871 DEBUG [main] - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@7e242b4d
2026-04-23 22:21:58.876 DEBUG [main] - -Dio.netty.allocator.useCachedMagazinesForNonEventLoopThreads: false
2026-04-23 22:21:58.883 DEBUG [main] - -Dio.netty.recycler.maxCapacityPerThread: 4096
2026-04-23 22:21:58.883 DEBUG [main] - -Dio.netty.recycler.ratio: 8
2026-04-23 22:21:58.883 DEBUG [main] - -Dio.netty.recycler.chunkSize: 32
2026-04-23 22:21:58.883 DEBUG [main] - -Dio.netty.recycler.blocking: false
2026-04-23 22:21:58.883 DEBUG [main] - -Dio.netty.recycler.batchFastThreadLocalOnly: true
2026-04-23 22:21:58.886 DEBUG [main] - -Dio.netty.allocator.type: adaptive
2026-04-23 22:21:58.886 DEBUG [main] - -Dio.netty.threadLocalDirectBufferSize: 0
2026-04-23 22:21:58.886 DEBUG [main] - -Dio.netty.maxThreadLocalCharBufferSize: 16384
2026-04-23 22:21:58.926 DEBUG [main] - -Dio.netty.processId: 19928 (auto-detected)
2026-04-23 22:21:58.928 DEBUG [main] - -Dio.netty.machineId: 18:c0:4d:ff:fe:a6:66:0f (auto-detected)
2026-04-23 22:21:58.942 DEBUG [main] - -Dio.netty.bootstrap.extensions: null
2026-04-23 22:21:59.187 INFO [main] - MQTT 브로커에 연결되었습니다.
2026-04-23 22:21:59.203 INFO [main] - MQTT Subscriber 브로커에 연결되었습니다.
2026-04-23 22:21:59.225 INFO [main] - CarTracking Server starting on port: 8081 (boss=1, worker=2)
2026-04-23 22:21:59.232 INFO [main] - CarTracking Server started on port: 8081
2026-04-23 22:21:59.237 INFO [RxComputationThreadPool-3] - MQTT subscribe 성공 [topic=vehicle/+/telemetry]
로그로 이렇게 두 개의 클라이언트가 연결햇다는 것을 확인할 수 있다.
사실 두 개라고 표현했지만, netty에서 혼자서 북치고 장구치고 하고 있다는 점은 인지해야한다.
자동차로 실제로 테스트를 할 수 는 없는 노릇이니 말이다.
CarTrackingServer.java
package org.example.netty_basecamp.cartracking.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.example.netty_basecamp.cartracking.netty.channel.CarTrackingChannelInitializer;
import org.example.netty_basecamp.cartracking.netty.rest.route.RouteRegistry;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CarTrackingServer {
private static final Logger logger = LogManager.getLogger();
private final int port;
private final RouteRegistry routeRegistry;
public CarTrackingServer(int port, RouteRegistry routeRegistry) {
this.port = port;
this.routeRegistry = routeRegistry;
}
private static Properties loadConfig() {
Properties props = new Properties();
try (InputStream is = CarTrackingServer.class
.getClassLoader().getResourceAsStream("netty_config.properties")) {
if (is != null) props.load(is);
} catch (Exception e) {
logger.warn("netty_config.properties 로드 실패 — 기본값 사용");
}
return props;
}
public void start() throws InterruptedException {
Properties config = loadConfig();
int bossCount = Integer.parseInt(config.getProperty("netty.thread.group_count", "1"));
int workerCount = Integer.parseInt(config.getProperty("netty.thread.child_count", "4"));
logger.info("CarTracking Server starting on port: {} (boss={}, worker={})", port, bossCount, workerCount);
// RDBMS / ConcurrentMap 등 블로킹 작업을 EventLoop 스레드에서 분리
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
IoHandlerFactory ioHandlerFactory = NioIoHandler.newFactory();
EventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(bossCount, ioHandlerFactory);
EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(workerCount, ioHandlerFactory);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new CarTrackingChannelInitializer(routeRegistry, virtualExecutor));
logger.info("CarTracking Server started on port: {}", port);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
virtualExecutor.shutdown();
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
서버가 시작되면서 netty설정을 한다.
CarTrackingChannelInitializer.java
public class CarTrackingChannelInitializer extends ChannelInitializer<Channel> {
private final RouteRegistry routeRegistry;
private final ExecutorService virtualExecutor;
public CarTrackingChannelInitializer(RouteRegistry routeRegistry, ExecutorService virtualExecutor) {
this.routeRegistry = routeRegistry;
this.virtualExecutor = virtualExecutor;
}
@Override
protected void initChannel(Channel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(65536));
p.addLast(new HttpRoutingHandler(routeRegistry, virtualExecutor));
}
}
사실 코드를 보면 CarTrackingAppConfig.java 의 initRoutes() 를 먼저 실행한다.
미리 위에서 본 것 처럼 MQTT Broker 에 밑작업을 해놓는 것이다.
CarTrackingAppConfig.java
package org.example.netty_basecamp.cartracking.netty.rest;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import org.example.netty_basecamp.basic.common.service.impl.CurrentTimeGenerator;
import org.example.netty_basecamp.cartracking.mqtt.MqttClientFactory;
import org.example.netty_basecamp.cartracking.mqtt.TelemetryPublisher;
import org.example.netty_basecamp.cartracking.mqtt.VehicleTelemetrySubscriber;
import org.example.netty_basecamp.cartracking.netty.rest.config.SimulatorRouteConfig;
import org.example.netty_basecamp.cartracking.netty.rest.config.TripRouteConfig;
import org.example.netty_basecamp.cartracking.netty.rest.config.VehicleRouteConfig;
import org.example.netty_basecamp.cartracking.netty.rest.route.RouteRegistry;
import org.example.netty_basecamp.cartracking.simulator.SimulatorBootstrap;
import org.example.netty_basecamp.cartracking.tracking.infrastructure.inmemory.InMemoryJourneyRepository;
import org.example.netty_basecamp.cartracking.tracking.infrastructure.inmemory.InMemoryLocationSnapshotRepository;
import org.example.netty_basecamp.cartracking.vehicle.application.TripApplicationService;
import org.example.netty_basecamp.cartracking.vehicle.domain.repository.VehicleRepository;
import org.example.netty_basecamp.cartracking.vehicle.infrastructure.inmemory.InMemoryVehicleRepository;
public class CarTrackingAppConfig {
private static final String MQTT_HOST = "localhost";
private static final int MQTT_PORT = 1883;
public static RouteRegistry initRoutes() {
VehicleRepository vehicleRepository = new InMemoryVehicleRepository();
// TripApplicationService를 한 번만 생성하여 REST API와 Subscriber가 공유
TripApplicationService tripApplicationService = new TripApplicationService(
new InMemoryJourneyRepository(),
vehicleRepository,
new InMemoryLocationSnapshotRepository(),
new CurrentTimeGenerator());
// MQTT Publisher + Simulator
SimulatorBootstrap simulatorBootstrap = initSimulator(vehicleRepository);
// MQTT Subscriber — 브로커로부터 telemetry 수신
initSubscriber(tripApplicationService);
RouteRegistry registry = new RouteRegistry();
VehicleRouteConfig.routes(vehicleRepository).forEach(registry::add);
TripRouteConfig.routes(tripApplicationService).forEach(registry::add);
SimulatorRouteConfig.routes(simulatorBootstrap).forEach(registry::add);
return registry;
}
private static SimulatorBootstrap initSimulator(VehicleRepository vehicleRepository) {
MqttClientFactory factory = new MqttClientFactory(MQTT_HOST, MQTT_PORT);
Mqtt3AsyncClient mqttClient = factory.create("cartracking-simulator");
TelemetryPublisher publisher = new TelemetryPublisher(mqttClient);
publisher.connect();
return new SimulatorBootstrap(vehicleRepository, publisher);
}
private static void initSubscriber(TripApplicationService tripApplicationService) {
MqttClientFactory factory = new MqttClientFactory(MQTT_HOST, MQTT_PORT);
Mqtt3AsyncClient mqttClient = factory.create("cartracking-subscriber");
VehicleTelemetrySubscriber subscriber = new VehicleTelemetrySubscriber(mqttClient, tripApplicationService);
subscriber.connect();
subscriber.subscribe();
}
}
CarTrackingAppConfig.java - initSimulator
/**
* 차량 추적 시뮬레이터를 초기화하는 메서드
* MQTT 연결 설정부터 시뮬레이터 부트스트랩 생성까지 담당
*
* @param vehicleRepository 차량 데이터 접근 레포지토리
* @return 초기화된 SimulatorBootstrap 인스턴스
*/
private static SimulatorBootstrap initSimulator(VehicleRepository vehicleRepository) {
// MQTT 브로커 연결 정보(호스트/포트)를 기반으로 클라이언트 팩토리 생성
MqttClientFactory factory = new MqttClientFactory(MQTT_HOST, MQTT_PORT);
// "cartracking-simulator" ID로 비동기 MQTT 클라이언트 생성
Mqtt3AsyncClient mqttClient = factory.create("cartracking-simulator");
// MQTT 클라이언트를 주입받아 텔레메트리 데이터 발행 담당 객체 생성
TelemetryPublisher publisher = new TelemetryPublisher(mqttClient);
// MQTT 브로커에 실제 연결 수립
publisher.connect();
// 차량 레포지토리와 퍼블리셔를 조합하여 시뮬레이터 부트스트랩 반환
return new SimulatorBootstrap(vehicleRepository, publisher);
}
CarTrackingAppConfig.java - initSubscriber
/**
* 차량 텔레메트리 데이터 구독자를 초기화하는 메서드
* MQTT 브로커로부터 차량 데이터를 수신하여 트립 서비스로 전달하는 역할
*
* @param tripApplicationService 수신한 텔레메트리 데이터를 처리할 트립 애플리케이션 서비스
*/
private static void initSubscriber(TripApplicationService tripApplicationService) {
// MQTT 브로커 연결 정보(호스트/포트)를 기반으로 클라이언트 팩토리 생성
MqttClientFactory factory = new MqttClientFactory(MQTT_HOST, MQTT_PORT);
// "cartracking-subscriber" ID로 비동기 MQTT 클라이언트 생성
// 아까 simulator("cartracking-simulator")와 클라이언트 ID가 다름 — 같은 브로커에 동시 연결 가능
Mqtt3AsyncClient mqttClient = factory.create("cartracking-subscriber");
// MQTT 클라이언트와 트립 서비스를 주입받아 텔레메트리 구독 담당 객체 생성
VehicleTelemetrySubscriber subscriber = new VehicleTelemetrySubscriber(mqttClient, tripApplicationService);
// MQTT 브로커에 실제 연결 수립
subscriber.connect();
// 차량 텔레메트리 토픽 구독 시작 — 이 시점부터 데이터 수신
subscriber.subscribe();
}
코드들을 보면 그런 일들을 해준다. 이게 1번에서 다 일어나는 일이다.
그리고 2,3,4가 있는데 다음과 같다.

2,3,4는 다음 명령어를 보면서 참고하자.
실행 명령어 - window git bash 기준
# 1. 차량 등록 — 시뮬레이터가 움직일 차량을 먼저 만들어야 함
curl -X POST http://localhost:8081/api/cartracking/vehicles -H 'Content-Type: application/json' -d
'{"plateNumber":"12ga3456","type":"SEDAN"}'
# 2. 시뮬레이터 시작 — 차량이 5초마다 가짜 GPS를 브로커에 publish 시작
# Subscriber가 telemetry 수신 시 자동으로 Journey 생성 + snapshot 기록
curl -X POST http://localhost:8081/api/cartracking/simulator/start
# 3. 운행 이력 확인 — Subscriber가 쌓은 데이터 조회
curl http://localhost:8081/api/cartracking/vehicles/1/journeys
# 4. 특정 운행의 경로(snapshot 목록) 조회
curl http://localhost:8081/api/cartracking/journeys/1/route
# 5. 시뮬레이터 종료
curl -X POST http://localhost:8081/api/cartracking/simulator/stop
여기서 4번이랑 5번이 좀 헷갈릴 수 있는데 다음과 같다.

4번은 등록된 차량을 전부 조회해서 virtual thread에게 준다.
그러면 run메소드를 실행하며 위치를 주기적으로 publish한다.
4번에서 우측으로 연결되어 있는 코드가 그것을 보여주는 것이다.
그리고 4번에서 이제 5번을 통해서 브로커로 정보를 publish한다.
다음은 작동하면서 보이는 로그 모습이다.


해당 글을 사실 필자는 정리할 생각이 없었다.
claude code 를 터미널에서 실행해서 skills 에 정리하고 진행사항을 보면서 차근차근 실행하고 있는데, 아무리 정리를 하고 물어봐도 필자가 이해가 안됐다.
결국에는 토큰을 다 소모하고 진행사항과 모르는 사항에 대해서 정리를 하고 잤는데, 다음날 들여봐도 솔직히 이해가 되지를 않았다.
또한 점점 규모가 커지면서 단순하게 cli로 물어보던 질문들로는 해결이 되지 잘 되지 않았으며 너무 빨리 작성되어버린 코드들의 이유와 작동 방식을 이해하기 위해서 많은 시간을 할애하게되었다.
내가 암만 봐도 이해를 못하는데, AI를 쓰는게 무슨 의미인가?
그래서 claude code에게 이 문제점에 대해서 "나만 그렇니? 코드는 AI가 다 작성해도 되는거니?" 라고 질문했다.
답변을 요약하자면 "학습시에는 빠르게 작성하고, 중요한 부분은 직접 하세요!" 라고 답변했다.
수많은 유튜브 내용을 보면 극단적으로 "개발자는 필요 없다!" 하는 사람도 있고, 현재 시점에서는 모르겠으나 몇 개월 전 영상을 보면 "AI는 개발자를 대체할 수 없다!" 라는 내용이 많다.
미국의 컴퓨터과학자, 발명가이자 기업인 또 미래학자로 유명한 레이 커즈와일이 2029년에 AGI가 출현할 것으로 예측하고 있다.
현재의 LLM모델 기반의 AI는 의지가 없고 사용자가 원하는 답변을 잘 찾아서 내놓는 수준이지만(물론 이것도 정말 미친 성능이다!), AGI가 등장하면 하나의 종을 탄생시키는 것이다.
하지만 필자가 생각하는 변하지 않은 진리는 "현 시점에도, 미래에도 사람이 책임지고 이를 리드해야한다는 점인데 이를 위해서는 빠르게 이해하고 머릿속에 계획을 구상하고 이를 검증하고 고객측에게 합리적인 방안을 제시"해야할 가능성이 아직은 여전히! 큰 점이다.
여전히 그래서 필자는 계획은 claude code와 대화하면서 청사진을 그리지만 공식문서를 읽어보고 그림을 직접 그려보고, 나만의 언어로 글을 풀어 쓰게 되는것 같다. 좀있으면 다 짤린다고? 그럼 포기하고 공부 안할건지 묻고싶다.
자! 그럼 질문을 하나 하겠다. 그냥 AI한테 물어보면 다 할 수 있다고? 글쎄요...? 공부 안하면 무엇을 할건가? 결국은 내가 알아야하는데...
출처:
https://www.manubes.com/what-is-mqtt/
https://mvnrepository.com/artifact/com.hivemq/hivemq-mqtt-client
https://www.daytona.io/definitions/e/enterprise-grade
https://github.com/hivemq/hivemq-community-edition
https://github.com/hivemq/hivemq-community-edition/wiki
https://github.com/hivemq/hivemq-mqtt-client
'CS + 인프라 > Messaging' 카테고리의 다른 글
| [Messaging] MQTT 이해하기 - 4탄 : Wildcards & Best Practices + QoS (0) | 2026.04.16 |
|---|---|
| [Messaging] MQTT 이해하기 - 3탄 : Broker와 Server Connection + Subscribe & Unsubscribe (0) | 2026.04.15 |
| [Messaging] MQTT 이해하기 - 2탄 : Pub/Sub 구조 (0) | 2026.04.14 |
| [Messaging] MQTT 이해하기 - 1탄 : MQTT란 무엇인가? + mqtt introduction (0) | 2026.04.14 |