programming language/Java

[Java] LMAX Disruptor 공식문서 읽기 4 - 마지막 : Disruptor 코드 탐방하기

공대키메라 2026. 4. 13. 21:30

이번 글에서는 지난 글들에 이어서 실제 LMAX Disruptor 코드를 보려고 한다.

 

사실 지난 글에서 블로그를 읽어보고 코드도 같이 분석하려고 했는데 (지난글 여기 클릭!)

 

블로그를 읽는게 너무... 길어서 코드 분석은 별도로 빼게 되었다.

 

그럼 다시 시작해.... 보자고!


목표

1. LMAX Disruptor의 코드를 이해한다.


1. 지난 내용을 통한 구조 리마인드

 

자! LMAX Disruptor에서 구조를 3개로 나눠서 효율성을 높혔다고 한다. 

 

저장공간, 생산자 그리고 소비자!

 

하지만 이 그림은 오래도니 버전을 기준으로 그린 것이고, 필자는 가장 최근 버전인 4.0.0 을 들여보려고 한다.

 

초기 버전과의 비교를 통해서도 이해하면 좋을 듯 하다.

 

2. 코드 들여다보기 - overview

해당 maven 주소(여기 클릭)를 통해 다운을 받았다.

 

받아서 까보니 다른 framework들에 비해서 코드가 많지 않다. 

 

Netty 같은 경우에는 정말 방대해서... 다 들여보지를 못했는데, 나름 정도가 괜찮다.

 

lmax.disruptor 내부 코드 캡쳐 모습

 

 

lmax.disruptor.4.0.0에서 WaitStrategy의 구현체 모습

 

개인적으로는 핵심 코드들이 다만 별도의 패키지로 나뉘어 있지 않은 점이 아쉽긴 하다.

 

다만 이전 글들을 읽으며 이해한 바로는 dsl을 도입해서 사용이 간편해졌다고 한다. 

 

이전에 user-guide에서(LMAX Disruptor User Guide) 를 보면 목록이 많이 있는데 해당 글에서 대략적으로 파악했으니 직접 코드를 볼 것이다.

 

3. 코드 들여다보기 - RingBuffer와  Sequencer

해당 LMAX Disruptor 시리즈를 정리하면서 가장 기억에 남는 부분은 가짜 공유(False Sharing) 이다. .

 

CPU 가 한 번의 연산을 위해서 배정받은 메모리 크기는 64비트(8바이트)이다. 

 

이를 방지하기 위해서 코드적으로 64비트의 메모리를 차지하도록 해서 사용하면서 같은 캐시라인이 공유되지 않도록 했다.

이 부분은 RingBufferPad에서 확인할 수 있다.

 

RingBuffer.java 내부에 선언된 RingBufferPad 캡쳐

 

또한, RingBufferPad를 RingBufferField가 상속하는데 RingBufferField에서 어떤 Entry를 사용할지, 어떤 Sequencer를 사용할 지 생성자로써 작동하도록 했다.

 

다시 한 번 이 RingBufferField를 우리가 많이 봐온 RingBuffer로 상속한다.

 

EventFactory를 통해서, RingBuffer를 채우고 Sequencer를 입력받아서 현재 sequences를 파악한다. 

 

lmax.disruptor.4.0.0에 Sequencer Interface 의 구현체를 본 모습

 

찬찬히 코드를 읽어보길 바란다. 

 

RingBuffer.java

/*
 * Copyright 2011 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor;


import com.lmax.disruptor.dsl.ProducerType;

abstract class RingBufferPad
{
    protected byte
        p10, p11, p12, p13, p14, p15, p16, p17,
        p20, p21, p22, p23, p24, p25, p26, p27,
        p30, p31, p32, p33, p34, p35, p36, p37,
        p40, p41, p42, p43, p44, p45, p46, p47,
        p50, p51, p52, p53, p54, p55, p56, p57,
        p60, p61, p62, p63, p64, p65, p66, p67,
        p70, p71, p72, p73, p74, p75, p76, p77;
}

abstract class RingBufferFields<E> extends RingBufferPad
{
    private static final int BUFFER_PAD = 32;

    private final long indexMask;
    private final E[] entries;
    protected final int bufferSize;
    protected final Sequencer sequencer;

    @SuppressWarnings("unchecked")
    RingBufferFields(
        final EventFactory<E> eventFactory,
        final Sequencer sequencer)
    {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.indexMask = bufferSize - 1;
        this.entries = (E[]) new Object[bufferSize + 2 * BUFFER_PAD];
        fill(eventFactory);
    }

    private void fill(final EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

    protected final E elementAt(final long sequence)
    {
        return entries[BUFFER_PAD + (int) (sequence & indexMask)];
    }
}

/**
 * Ring based store of reusable entries containing the data representing
 * an event being exchanged between event producer and {@link EventProcessor}s.
 *
 * @param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.
 */
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
    /**
     * The initial cursor value
     */
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected byte
        p10, p11, p12, p13, p14, p15, p16, p17,
        p20, p21, p22, p23, p24, p25, p26, p27,
        p30, p31, p32, p33, p34, p35, p36, p37,
        p40, p41, p42, p43, p44, p45, p46, p47,
        p50, p51, p52, p53, p54, p55, p56, p57,
        p60, p61, p62, p63, p64, p65, p66, p67,
        p70, p71, p72, p73, p74, p75, p76, p77;

    /**
     * Construct a RingBuffer with the full option set.
     *
     * @param eventFactory to newInstance entries for filling the RingBuffer
     * @param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     */
    RingBuffer(
        final EventFactory<E> eventFactory,
        final Sequencer sequencer)
    {
        super(eventFactory, sequencer);
    }

    /**
     * Create a new multiple producer RingBuffer with the specified wait strategy.
     *
     * @param <E> Class of the event stored in the ring buffer.
     * @param factory      used to create the events within the ring buffer.
     * @param bufferSize   number of elements to create within the ring buffer.
     * @param waitStrategy used to determine how to wait for new elements to become available.
     * @return a constructed ring buffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     * @see MultiProducerSequencer
     */
    public static <E> RingBuffer<E> createMultiProducer(
        final EventFactory<E> factory,
        final int bufferSize,
        final WaitStrategy waitStrategy)
    {
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<>(factory, sequencer);
    }

    /**
     * Create a new multiple producer RingBuffer using the default wait strategy  {@link BlockingWaitStrategy}.
     *
     * @param <E> Class of the event stored in the ring buffer.
     * @param factory    used to create the events within the ring buffer.
     * @param bufferSize number of elements to create within the ring buffer.
     * @return a constructed ring buffer.
     * @throws IllegalArgumentException if <code>bufferSize</code> is less than 1 or not a power of 2
     * @see MultiProducerSequencer
     */
    public static <E> RingBuffer<E> createMultiProducer(final EventFactory<E> factory, final int bufferSize)
    {
        return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
    }

    /**
     * Create a new single producer RingBuffer with the specified wait strategy.
     *
     * @param <E> Class of the event stored in the ring buffer.
     * @param factory      used to create the events within the ring buffer.
     * @param bufferSize   number of elements to create within the ring buffer.
     * @param waitStrategy used to determine how to wait for new elements to become available.
     * @return a constructed ring buffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     * @see SingleProducerSequencer
     */
    public static <E> RingBuffer<E> createSingleProducer(
        final EventFactory<E> factory,
        final int bufferSize,
        final WaitStrategy waitStrategy)
    {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<>(factory, sequencer);
    }

    /**
     * Create a new single producer RingBuffer using the default wait strategy  {@link BlockingWaitStrategy}.
     *
     * @param <E> Class of the event stored in the ring buffer.
     * @param factory    used to create the events within the ring buffer.
     * @param bufferSize number of elements to create within the ring buffer.
     * @return a constructed ring buffer.
     * @throws IllegalArgumentException if <code>bufferSize</code> is less than 1 or not a power of 2
     * @see MultiProducerSequencer
     */
    public static <E> RingBuffer<E> createSingleProducer(final EventFactory<E> factory, final int bufferSize)
    {
        return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
    }

    /**
     * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
     *
     * @param <E> Class of the event stored in the ring buffer.
     * @param producerType producer type to use {@link ProducerType}.
     * @param factory      used to create events within the ring buffer.
     * @param bufferSize   number of elements to create within the ring buffer.
     * @param waitStrategy used to determine how to wait for new elements to become available.
     * @return a constructed ring buffer.
     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
     */
    public static <E> RingBuffer<E> create(
        final ProducerType producerType,
        final EventFactory<E> factory,
        final int bufferSize,
        final WaitStrategy waitStrategy)
    {
        switch (producerType)
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }

    /**
     * <p>Get the event for a given sequence in the RingBuffer.</p>
     *
     * <p>This call has 2 uses.  Firstly use this call when publishing to a ring buffer.
     * After calling {@link RingBuffer#next()} use this call to get hold of the
     * preallocated event to fill with data before calling {@link RingBuffer#publish(long)}.</p>
     *
     * <p>Secondly use this call when consuming data from the ring buffer.  After calling
     * {@link SequenceBarrier#waitFor(long)} call this method with any value greater than
     * that your current consumer sequence and less than or equal to the value returned from
     * the {@link SequenceBarrier#waitFor(long)} method.</p>
     *
     * @param sequence for the event
     * @return the event for the given sequence
     */
    @Override
    public E get(final long sequence)
    {
        return elementAt(sequence);
    }

    /**
     * Increment and return the next sequence for the ring buffer.  Calls of this
     * method should ensure that they always publish the sequence afterward.  E.g.
     * <pre>
     * long sequence = ringBuffer.next();
     * try {
     *     Event e = ringBuffer.get(sequence);
     *     // Do some work with the event.
     * } finally {
     *     ringBuffer.publish(sequence);
     * }
     * </pre>
     *
     * @return The next sequence to publish to.
     * @see RingBuffer#publish(long)
     * @see RingBuffer#get(long)
     */
    @Override
    public long next()
    {
        return sequencer.next();
    }

    /**
     * The same functionality as {@link RingBuffer#next()}, but allows the caller to claim
     * the next n sequences.
     *
     * @param n number of slots to claim
     * @return sequence number of the highest slot claimed
     * @see Sequencer#next(int)
     */
    @Override
    public long next(final int n)
    {
        return sequencer.next(n);
    }

    /**
     * <p>Increment and return the next sequence for the ring buffer.  Calls of this
     * method should ensure that they always publish the sequence afterward.  E.g.</p>
     * <pre>
     * long sequence = ringBuffer.next();
     * try {
     *     Event e = ringBuffer.get(sequence);
     *     // Do some work with the event.
     * } finally {
     *     ringBuffer.publish(sequence);
     * }
     * </pre>
     * <p>This method will not block if there is not space available in the ring
     * buffer, instead it will throw an {@link InsufficientCapacityException}.</p>
     *
     * @return The next sequence to publish to.
     * @throws InsufficientCapacityException if the necessary space in the ring buffer is not available
     * @see RingBuffer#publish(long)
     * @see RingBuffer#get(long)
     */
    @Override
    public long tryNext() throws InsufficientCapacityException
    {
        return sequencer.tryNext();
    }

    /**
     * The same functionality as {@link RingBuffer#tryNext()}, but allows the caller to attempt
     * to claim the next n sequences.
     *
     * @param n number of slots to claim
     * @return sequence number of the highest slot claimed
     * @throws InsufficientCapacityException if the necessary space in the ring buffer is not available
     */
    @Override
    public long tryNext(final int n) throws InsufficientCapacityException
    {
        return sequencer.tryNext(n);
    }

    /**
     * Sets the cursor to a specific sequence and returns the preallocated entry that is stored there.  This
     * can cause a data race and should only be done in controlled circumstances, e.g. during initialisation.
     *
     * @param sequence The sequence to claim.
     * @return The preallocated event.
     */
    public E claimAndGetPreallocated(final long sequence)
    {
        sequencer.claim(sequence);
        return get(sequence);
    }

    /**
     * Determines if the event for a given sequence is currently available.
     *
     * <p>Note that this does not guarantee that event will still be available
     * on the next interaction with the RingBuffer. For example, it is not
     * necessarily safe to write code like this:
     *
     * <pre>{@code
     * if (ringBuffer.isAvailable(sequence))
     * {
     *     final E e = ringBuffer.get(sequence);
     *     // ...do something with e
     * }
     * }</pre>
     *
     * <p>because there is a race between the reading thread and the writing thread.
     *
     * <p>This method will also return false when querying for sequences that are
     * behind the ring buffer's wrap point.
     *
     * @param sequence The sequence to identify the entry.
     * @return If the event published with the given sequence number is currently available.
     */
    public boolean isAvailable(final long sequence)
    {
        return sequencer.isAvailable(sequence);
    }

    /**
     * Add the specified gating sequences to this instance of the Disruptor.  They will
     * safely and atomically added to the list of gating sequences.
     *
     * @param gatingSequences The sequences to add.
     */
    public void addGatingSequences(final Sequence... gatingSequences)
    {
        sequencer.addGatingSequences(gatingSequences);
    }

    /**
     * Get the minimum sequence value from all of the gating sequences
     * added to this ringBuffer.
     *
     * @return The minimum gating sequence or the cursor sequence if
     * no sequences have been added.
     */
    public long getMinimumGatingSequence()
    {
        return sequencer.getMinimumSequence();
    }

    /**
     * Remove the specified sequence from this ringBuffer.
     *
     * @param sequence to be removed.
     * @return <code>true</code> if this sequence was found, <code>false</code> otherwise.
     */
    public boolean removeGatingSequence(final Sequence sequence)
    {
        return sequencer.removeGatingSequence(sequence);
    }

    /**
     * Create a new SequenceBarrier to be used by an EventProcessor to track which messages
     * are available to be read from the ring buffer given a list of sequences to track.
     *
     * @param sequencesToTrack the additional sequences to track
     * @return A sequence barrier that will track the specified sequences.
     * @see SequenceBarrier
     */
    public SequenceBarrier newBarrier(final Sequence... sequencesToTrack)
    {
        return sequencer.newBarrier(sequencesToTrack);
    }

    /**
     * Creates an event poller for this ring buffer gated on the supplied sequences.
     *
     * @param gatingSequences to be gated on.
     * @return A poller that will gate on this ring buffer and the supplied sequences.
     */
    public EventPoller<E> newPoller(final Sequence... gatingSequences)
    {
        return sequencer.newPoller(this, gatingSequences);
    }

    /**
     * Get the current cursor value for the ring buffer.  The actual value received
     * will depend on the type of {@link Sequencer} that is being used.
     *
     * @see MultiProducerSequencer
     * @see SingleProducerSequencer
     */
    @Override
    public long getCursor()
    {
        return sequencer.getCursor();
    }

    /**
     * The size of the buffer.
     *
     * @return size of buffer
     */
    @Override
    public int getBufferSize()
    {
        return bufferSize;
    }

    /**
     * Given specified <code>requiredCapacity</code> determines if that amount of space
     * is available.  Note, you can not assume that if this method returns <code>true</code>
     * that a call to {@link RingBuffer#next()} will not block.  Especially true if this
     * ring buffer is set up to handle multiple producers.
     *
     * @param requiredCapacity The capacity to check for.
     * @return <code>true</code> If the specified <code>requiredCapacity</code> is available
     * <code>false</code> if not.
     */
    @Override
    public boolean hasAvailableCapacity(final int requiredCapacity)
    {
        return sequencer.hasAvailableCapacity(requiredCapacity);
    }


    /**
     * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslator)
     */
    @Override
    public void publishEvent(final EventTranslator<E> translator)
    {
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslator)
     */
    @Override
    public boolean tryPublishEvent(final EventTranslator<E> translator)
    {
        try
        {
            final long sequence = sequencer.tryNext();
            translateAndPublish(translator, sequence);
            return true;
        }
        catch (InsufficientCapacityException e)
        {
            return false;
        }
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorOneArg, Object)
     * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorOneArg, A)
     */
    @Override
    public <A> void publishEvent(final EventTranslatorOneArg<E, A> translator, final A arg0)
    {
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence, arg0);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorOneArg, Object)
     * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorOneArg, A)
     */
    @Override
    public <A> boolean tryPublishEvent(final EventTranslatorOneArg<E, A> translator, final A arg0)
    {
        try
        {
            final long sequence = sequencer.tryNext();
            translateAndPublish(translator, sequence, arg0);
            return true;
        }
        catch (InsufficientCapacityException e)
        {
            return false;
        }
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorTwoArg, Object, Object)
     * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorTwoArg, A, B)
     */
    @Override
    public <A, B> void publishEvent(final EventTranslatorTwoArg<E, A, B> translator, final A arg0, final B arg1)
    {
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence, arg0, arg1);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorTwoArg, Object, Object)
     * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorTwoArg, A, B)
     */
    @Override
    public <A, B> boolean tryPublishEvent(final EventTranslatorTwoArg<E, A, B> translator, final A arg0, final B arg1)
    {
        try
        {
            final long sequence = sequencer.tryNext();
            translateAndPublish(translator, sequence, arg0, arg1);
            return true;
        }
        catch (InsufficientCapacityException e)
        {
            return false;
        }
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorThreeArg, Object, Object, Object)
     * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorThreeArg, A, B, C)
     */
    @Override
    public <A, B, C> void publishEvent(final EventTranslatorThreeArg<E, A, B, C> translator, final A arg0, final B arg1, final C arg2)
    {
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence, arg0, arg1, arg2);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorThreeArg, Object, Object, Object)
     * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorThreeArg, A, B, C)
     */
    @Override
    public <A, B, C> boolean tryPublishEvent(final EventTranslatorThreeArg<E, A, B, C> translator, final A arg0, final B arg1, final C arg2)
    {
        try
        {
            final long sequence = sequencer.tryNext();
            translateAndPublish(translator, sequence, arg0, arg1, arg2);
            return true;
        }
        catch (InsufficientCapacityException e)
        {
            return false;
        }
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object...)
     */
    @Override
    public void publishEvent(final EventTranslatorVararg<E> translator, final Object... args)
    {
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence, args);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object...)
     */
    @Override
    public boolean tryPublishEvent(final EventTranslatorVararg<E> translator, final Object... args)
    {
        try
        {
            final long sequence = sequencer.tryNext();
            translateAndPublish(translator, sequence, args);
            return true;
        }
        catch (InsufficientCapacityException e)
        {
            return false;
        }
    }


    /**
     * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslator[])
     */
    @Override
    public void publishEvents(final EventTranslator<E>[] translators)
    {
        publishEvents(translators, 0, translators.length);
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslator[], int, int)
     */
    @Override
    public void publishEvents(final EventTranslator<E>[] translators, final int batchStartsAt, final int batchSize)
    {
        checkBounds(translators, batchStartsAt, batchSize);
        final long finalSequence = sequencer.next(batchSize);
        translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslator[])
     */
    @Override
    public boolean tryPublishEvents(final EventTranslator<E>[] translators)
    {
        return tryPublishEvents(translators, 0, translators.length);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslator[], int, int)
     */
    @Override
    public boolean tryPublishEvents(final EventTranslator<E>[] translators, final int batchStartsAt, final int batchSize)
    {
        checkBounds(translators, batchStartsAt, batchSize);
        try
        {
            final long finalSequence = sequencer.tryNext(batchSize);
            translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
            return true;
        }
        catch (InsufficientCapacityException e)
        {
            return false;
        }
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, Object[])
     * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, A[])
     */
    @Override
    public <A> void publishEvents(final EventTranslatorOneArg<E, A> translator, final A[] arg0)
    {
        publishEvents(translator, 0, arg0.length, arg0);
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, Object[])
     * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, A[])
     */
    @Override
    public <A> void publishEvents(final EventTranslatorOneArg<E, A> translator, final int batchStartsAt, final int batchSize, final A[] arg0)
    {
        checkBounds(arg0, batchStartsAt, batchSize);
        final long finalSequence = sequencer.next(batchSize);
        translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, Object[])
     * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, A[])
     */
    @Override
    public <A> boolean tryPublishEvents(final EventTranslatorOneArg<E, A> translator, final A[] arg0)
    {
        return tryPublishEvents(translator, 0, arg0.length, arg0);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, Object[])
     * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, A[])
     */
    @Override
    public <A> boolean tryPublishEvents(
            final EventTranslatorOneArg<E, A> translator, final int batchStartsAt, final int batchSize, final A[] arg0)
    {
        checkBounds(arg0, batchStartsAt, batchSize);
        try
        {
            final long finalSequence = sequencer.tryNext(batchSize);
            translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence);
            return true;
        }
        catch (InsufficientCapacityException e)
        {
            return false;
        }
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, Object[], Object[])
     * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, A[], B[])
     */
    @Override
    public <A, B> void publishEvents(final EventTranslatorTwoArg<E, A, B> translator, final A[] arg0, final B[] arg1)
    {
        publishEvents(translator, 0, arg0.length, arg0, arg1);
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, Object[], Object[])
     * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, A[], B[])
     */
    @Override
    public <A, B> void publishEvents(
        final EventTranslatorTwoArg<E, A, B> translator, final int batchStartsAt, final int batchSize, final A[] arg0, final B[] arg1)
    {
        checkBounds(arg0, arg1, batchStartsAt, batchSize);
        final long finalSequence = sequencer.next(batchSize);
        translateAndPublishBatch(translator, arg0, arg1, batchStartsAt, batchSize, finalSequence);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, Object[], Object[])
     * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, A[], B[])
     */
    @Override
    public <A, B> boolean tryPublishEvents(final EventTranslatorTwoArg<E, A, B> translator, final A[] arg0, final B[] arg1)
    {
        return tryPublishEvents(translator, 0, arg0.length, arg0, arg1);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, Object[], Object[])
     * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, A[], B[])
     */
    @Override
    public <A, B> boolean tryPublishEvents(
        final EventTranslatorTwoArg<E, A, B> translator, final int batchStartsAt, final int batchSize, final A[] arg0, final B[] arg1)
    {
        checkBounds(arg0, arg1, batchStartsAt, batchSize);
        try
        {
            final long finalSequence = sequencer.tryNext(batchSize);
            translateAndPublishBatch(translator, arg0, arg1, batchStartsAt, batchSize, finalSequence);
            return true;
        }
        catch (InsufficientCapacityException e)
        {
            return false;
        }
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, Object[], Object[], Object[])
     * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, A[], B[], C[])
     */
    @Override
    public <A, B, C> void publishEvents(final EventTranslatorThreeArg<E, A, B, C> translator, final A[] arg0, final B[] arg1, final C[] arg2)
    {
        publishEvents(translator, 0, arg0.length, arg0, arg1, arg2);
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, Object[], Object[], Object[])
     * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, A[], B[], C[])
     */
    @Override
    public <A, B, C> void publishEvents(
        final EventTranslatorThreeArg<E, A, B, C> translator, final int batchStartsAt, final int batchSize, final A[] arg0, final B[] arg1, final C[] arg2)
    {
        checkBounds(arg0, arg1, arg2, batchStartsAt, batchSize);
        final long finalSequence = sequencer.next(batchSize);
        translateAndPublishBatch(translator, arg0, arg1, arg2, batchStartsAt, batchSize, finalSequence);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, Object[], Object[], Object[])
     * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, A[], B[], C[])
     */
    @Override
    public <A, B, C> boolean tryPublishEvents(
        final EventTranslatorThreeArg<E, A, B, C> translator, final A[] arg0, final B[] arg1, final C[] arg2)
    {
        return tryPublishEvents(translator, 0, arg0.length, arg0, arg1, arg2);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, Object[], Object[], Object[])
     * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, A[], B[], C[])
     */
    @Override
    public <A, B, C> boolean tryPublishEvents(
        final EventTranslatorThreeArg<E, A, B, C> translator, final int batchStartsAt, final int batchSize, final A[] arg0, final B[] arg1, final C[] arg2)
    {
        checkBounds(arg0, arg1, arg2, batchStartsAt, batchSize);
        try
        {
            final long finalSequence = sequencer.tryNext(batchSize);
            translateAndPublishBatch(translator, arg0, arg1, arg2, batchStartsAt, batchSize, finalSequence);
            return true;
        }
        catch (InsufficientCapacityException e)
        {
            return false;
        }
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object[][])
     */
    @Override
    public void publishEvents(final EventTranslatorVararg<E> translator, final Object[]... args)
    {
        publishEvents(translator, 0, args.length, args);
    }

    /**
     * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorVararg, int, int, java.lang.Object[][])
     */
    @Override
    public void publishEvents(final EventTranslatorVararg<E> translator, final int batchStartsAt, final int batchSize, final Object[]... args)
    {
        checkBounds(batchStartsAt, batchSize, args);
        final long finalSequence = sequencer.next(batchSize);
        translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object[][])
     */
    @Override
    public boolean tryPublishEvents(final EventTranslatorVararg<E> translator, final Object[]... args)
    {
        return tryPublishEvents(translator, 0, args.length, args);
    }

    /**
     * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorVararg, int, int, java.lang.Object[][])
     */
    @Override
    public boolean tryPublishEvents(
        final EventTranslatorVararg<E> translator, final int batchStartsAt, final int batchSize, final Object[]... args)
    {
        checkBounds(args, batchStartsAt, batchSize);
        try
        {
            final long finalSequence = sequencer.tryNext(batchSize);
            translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args);
            return true;
        }
        catch (InsufficientCapacityException e)
        {
            return false;
        }
    }

    /**
     * Publish the specified sequence.  This action marks this particular
     * message as being available to be read.
     *
     * @param sequence the sequence to publish.
     */
    @Override
    public void publish(final long sequence)
    {
        sequencer.publish(sequence);
    }

    /**
     * Publish the specified sequences.  This action marks these particular
     * messages as being available to be read.
     *
     * @param lo the lowest sequence number to be published
     * @param hi the highest sequence number to be published
     * @see Sequencer#next(int)
     */
    @Override
    public void publish(final long lo, final long hi)
    {
        sequencer.publish(lo, hi);
    }

    /**
     * Get the remaining capacity for this ringBuffer.
     *
     * @return The number of slots remaining.
     */
    @Override
    public long remainingCapacity()
    {
        return sequencer.remainingCapacity();
    }

    private void checkBounds(final EventTranslator<E>[] translators, final int batchStartsAt, final int batchSize)
    {
        checkBatchSizing(batchStartsAt, batchSize);
        batchOverRuns(translators, batchStartsAt, batchSize);
    }

    private void checkBatchSizing(final int batchStartsAt, final int batchSize)
    {
        if (batchStartsAt < 0 || batchSize < 0)
        {
            throw new IllegalArgumentException("Both batchStartsAt and batchSize must be positive but got: batchStartsAt " + batchStartsAt + " and batchSize " + batchSize);
        }
        else if (batchSize > bufferSize)
        {
            throw new IllegalArgumentException("The ring buffer cannot accommodate " + batchSize + " it only has space for " + bufferSize + " entities.");
        }
    }

    private <A> void checkBounds(final A[] arg0, final int batchStartsAt, final int batchSize)
    {
        checkBatchSizing(batchStartsAt, batchSize);
        batchOverRuns(arg0, batchStartsAt, batchSize);
    }

    private <A, B> void checkBounds(final A[] arg0, final B[] arg1, final int batchStartsAt, final int batchSize)
    {
        checkBatchSizing(batchStartsAt, batchSize);
        batchOverRuns(arg0, batchStartsAt, batchSize);
        batchOverRuns(arg1, batchStartsAt, batchSize);
    }

    private <A, B, C> void checkBounds(
        final A[] arg0, final B[] arg1, final C[] arg2, final int batchStartsAt, final int batchSize)
    {
        checkBatchSizing(batchStartsAt, batchSize);
        batchOverRuns(arg0, batchStartsAt, batchSize);
        batchOverRuns(arg1, batchStartsAt, batchSize);
        batchOverRuns(arg2, batchStartsAt, batchSize);
    }

    private void checkBounds(final int batchStartsAt, final int batchSize, final Object[][] args)
    {
        checkBatchSizing(batchStartsAt, batchSize);
        batchOverRuns(args, batchStartsAt, batchSize);
    }

    private <A> void batchOverRuns(final A[] arg0, final int batchStartsAt, final int batchSize)
    {
        if (batchStartsAt + batchSize > arg0.length)
        {
            throw new IllegalArgumentException(
                "A batchSize of: " + batchSize +
                    " with batchStatsAt of: " + batchStartsAt +
                    " will overrun the available number of arguments: " + (arg0.length - batchStartsAt));
        }
    }

    private void translateAndPublish(final EventTranslator<E> translator, final long sequence)
    {
        try
        {
            translator.translateTo(get(sequence), sequence);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }

    private <A> void translateAndPublish(final EventTranslatorOneArg<E, A> translator, final long sequence, final A arg0)
    {
        try
        {
            translator.translateTo(get(sequence), sequence, arg0);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }

    private <A, B> void translateAndPublish(final EventTranslatorTwoArg<E, A, B> translator, final long sequence, final A arg0, final B arg1)
    {
        try
        {
            translator.translateTo(get(sequence), sequence, arg0, arg1);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }

    private <A, B, C> void translateAndPublish(
        final EventTranslatorThreeArg<E, A, B, C> translator, final long sequence,
        final A arg0, final B arg1, final C arg2)
    {
        try
        {
            translator.translateTo(get(sequence), sequence, arg0, arg1, arg2);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }

    private void translateAndPublish(final EventTranslatorVararg<E> translator, final long sequence, final Object... args)
    {
        try
        {
            translator.translateTo(get(sequence), sequence, args);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }

    private void translateAndPublishBatch(
        final EventTranslator<E>[] translators, final int batchStartsAt,
        final int batchSize, final long finalSequence)
    {
        final long initialSequence = finalSequence - (batchSize - 1);
        try
        {
            long sequence = initialSequence;
            final int batchEndsAt = batchStartsAt + batchSize;
            for (int i = batchStartsAt; i < batchEndsAt; i++)
            {
                final EventTranslator<E> translator = translators[i];
                translator.translateTo(get(sequence), sequence++);
            }
        }
        finally
        {
            sequencer.publish(initialSequence, finalSequence);
        }
    }

    private <A> void translateAndPublishBatch(
        final EventTranslatorOneArg<E, A> translator, final A[] arg0,
        final int batchStartsAt, final int batchSize, final long finalSequence)
    {
        final long initialSequence = finalSequence - (batchSize - 1);
        try
        {
            long sequence = initialSequence;
            final int batchEndsAt = batchStartsAt + batchSize;
            for (int i = batchStartsAt; i < batchEndsAt; i++)
            {
                translator.translateTo(get(sequence), sequence++, arg0[i]);
            }
        }
        finally
        {
            sequencer.publish(initialSequence, finalSequence);
        }
    }

    private <A, B> void translateAndPublishBatch(
        final EventTranslatorTwoArg<E, A, B> translator, final A[] arg0,
        final B[] arg1, final int batchStartsAt, final int batchSize,
        final long finalSequence)
    {
        final long initialSequence = finalSequence - (batchSize - 1);
        try
        {
            long sequence = initialSequence;
            final int batchEndsAt = batchStartsAt + batchSize;
            for (int i = batchStartsAt; i < batchEndsAt; i++)
            {
                translator.translateTo(get(sequence), sequence++, arg0[i], arg1[i]);
            }
        }
        finally
        {
            sequencer.publish(initialSequence, finalSequence);
        }
    }

    private <A, B, C> void translateAndPublishBatch(
        final EventTranslatorThreeArg<E, A, B, C> translator,
        final A[] arg0, final B[] arg1, final C[] arg2, final int batchStartsAt,
        final int batchSize, final long finalSequence)
    {
        final long initialSequence = finalSequence - (batchSize - 1);
        try
        {
            long sequence = initialSequence;
            final int batchEndsAt = batchStartsAt + batchSize;
            for (int i = batchStartsAt; i < batchEndsAt; i++)
            {
                translator.translateTo(get(sequence), sequence++, arg0[i], arg1[i], arg2[i]);
            }
        }
        finally
        {
            sequencer.publish(initialSequence, finalSequence);
        }
    }

    private void translateAndPublishBatch(
        final EventTranslatorVararg<E> translator, final int batchStartsAt,
        final int batchSize, final long finalSequence, final Object[][] args)
    {
        final long initialSequence = finalSequence - (batchSize - 1);
        try
        {
            long sequence = initialSequence;
            final int batchEndsAt = batchStartsAt + batchSize;
            for (int i = batchStartsAt; i < batchEndsAt; i++)
            {
                translator.translateTo(get(sequence), sequence++, args[i]);
            }
        }
        finally
        {
            sequencer.publish(initialSequence, finalSequence);
        }
    }

    @Override
    public String toString()
    {
        return "RingBuffer{" +
            "bufferSize=" + bufferSize +
            ", sequencer=" + sequencer +
            "}";
    }
}

 

RingBuffer자체에서 Producer를 생성한다. Factory를 념겨주는데 희안하게 buffersize를 넘겨준다.

 

이게 왜 필요한가? 싶어서 Gemini에게 물어보았다.

 

bufferSize 는 단순히 "배열을 얼마만 한 크기로 만들 것인가"를 넘어, 생산자가 소비자의 데이터를 덮어쓰지 않도록 경계를 계산하는 안전장치(Wrap Point)의 기준값이자, 극단적으로 빠른 배열 접근을 위한 비트 연산 마스크의 원재료로 사용되기 때문에 핵심 동기화 객체인 Sequencer에 전달됩니다.

 

생각해보면 이전 글(이전 글 여기 클릭)에서 multi producer인 경우에 RingBuffer의 Sequence를 업데이트 하기 위해서는, 가장 뒤의 sequence를 update해야만, 그 다음을 차근 차근 나아갈 수 잇게 만들었다.

 

즉, 생산자(Producer)는 소비자(Consumer)가 아직 처리하지 않은 데이터를 덮어쓰면 안 된다 는 것이 중요하다.

 

그렇기 때문에 Sequencer가 생산자에게 자리를 내어주기 전, 소비자를 앞지르지 못하도록 제어(Gating)하기 위한 기준값으로 bufferSize가 필수적으로 사용된다고 한다.

 

또 ~~publish 하는 메소드가 많다. 이건 도대체 뭐하는 건가?

 

우선 이를 이해하기 위해서 EventTranslator interface를 보았다.

 

EventTranslator.java

/*
 * Copyright 2011 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor;

/**
 * Implementations translate (write) data representations into events claimed from the {@link RingBuffer}.
 *
 * <p>When publishing to the RingBuffer, provide an EventTranslator. The RingBuffer will select the next available
 * event by sequence and provide it to the EventTranslator (which should update the event), before publishing
 * the sequence update.
 *
 * @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
 */
public interface EventTranslator<T>
{
    /**
     * Translate a data representation into fields set in given event
     *
     * @param event    into which the data should be translated.
     * @param sequence that is assigned to event.
     */
    void translateTo(T event, long sequence);
}

 

구현체는 데이터 표현을 RingBuffer에서 가져온 이벤트로 변환(쓰기)합니다. RingBuffer에 게시할 때는 EventTranslator를 제공해야 합니다. RingBuffer는 시퀀스별로 사용 ​​가능한 다음 이벤트를 선택하여 EventTranslator에 제공하고(EventTranslator는 이벤트를 업데이트해야 함), 시퀀스 업데이트를 게시합니다.

 

또한 EventSink를 보면 publish를 하는 행위 자체가 입력받은 event를 다른 thread가 볼 수 있도록 하는 행위이다.

 

채번하고(1), wriite (2) 마지막으로 publish 을 통해 commit(3)한다.

 

RingBuffer코드에서 cliam 은 next(final int n)메소드를 호출해서 내가 원하는 sequence number의 slot를 요청한다.

 

그리고 이를 sequencer의 publish 메소드를 통해 처리한다. 

 

여기서 Event란? (Gemini 도움)

 

  • 정의: Event는 비즈니스 데이터를 담는 가변적인 순수 자바 객체(POJO)입니다.
  • 할당: RingBuffer가 생성될 때 미리 정해진 개수만큼 메모리에 연속적으로 할당됩니다. (캐시 적중률 극대화)
  • 사용:  이전 질문에서 보셨던 수많은 publishEvent(EventTranslator...) 로직이 바로, 텅 빈 채로 돌아가는 이 Event 바구니 중 하나를 낚아채서 그 안의 멤버 변수 값들을 새로운 값으로 세팅해 주는(Translate) 과정입니다.

 

그렇단 말은... Event인 경우에는 다양하게 선언은 불가능하며, 그러한 경우에는 애초에 Event를 만들 때 크게 생성해야한다는 말이다.

 

왜 그런지는 궁금하면 다음 글을 열어보길 바란다. Gemini와 대화를 했다.

 

더보기
더보기

매우 날카롭고 정확한 통찰입니다. 질문하신 내용이 바로 LMAX Disruptor를 도입할 때 모든 개발자가 가장 먼저 마주하는 아키텍처적 한계이자 설계의 딜레마입니다.

결론부터 말씀드리면, 하나의 `RingBuffer`에 개발자가 직접 선언한 **여러 종류의 POJO를 다형성(`Object`나 공통 인터페이스)을 이용해 마구잡이로 섞어 담는 것은 사실상 불가능하며, 절대 권장하지 않는 안티 패턴**입니다.

그 이유와 실무에서 이 문제를 해결하는 아키텍처적 기법을 설명해 드리겠습니다.

### 1. 다양한 POJO를 섞을 수 없는 하드웨어적/구조적 이유

**A. 객체 크기의 불일치와 캐시 라인 파괴**
Disruptor의 핵심은 앞서 말씀드린 '캐시 라인 패딩'과 '연속된 메모리 할당'입니다. `RingBuffer` 배열에 동일한 크기와 구조를 가진 객체들이 나란히 배치되어야 CPU 하드웨어 프리페처(Prefetcher)가 다음 데이터를 미리 캐시로 퍼올릴 수 있습니다.
만약 슬롯마다 객체의 메모리 크기(Memory Footprint)가 들쭉날쭉하다면 캐시 적중률(Cache Hit Ratio)이 급격히 떨어집니다.

**B. EventFactory의 사전 할당(Pre-allocation) 불가**
`RingBuffer`는 시스템 구동 시점에 `EventFactory.newInstance()`를 호출하여 바구니를 미리 채워둡니다. 만약 A타입, B타입, C타입 POJO가 무작위로 들어온다면, 구동 시점에 어떤 타입의 바구니를 미리 만들어 두어야 할지 결정할 수 없습니다.

**C. 다운캐스팅(Downcasting)과 분기 비용**
소비자(Consumer) 측에서 이벤트를 꺼냈을 때, 이것이 무슨 POJO인지 확인하기 위해 매번 `instanceof` 연산자로 타입을 검사하고 캐스팅해야 합니다. 초당 수백만 건을 처리하는 파이프라인에서 이러한 런타임 타입 검사와 분기문(if-else) 파이프라인 플러시 현상은 치명적인 성능 저하를 유발합니다.

---

### 2. 실무에서의 해결책: 봉투 패턴 (Envelope Pattern / Fat Event)

그렇다면 "주문 처리", "결제 처리", "재고 차감"처럼 서로 다른 데이터(POJO)가 순차적으로 흘러가야 하는 복잡한 비즈니스 로직에서는 Disruptor를 어떻게 쓸까요? 

가장 널리 쓰이는 표준 아키텍처 패턴은 **'뚱뚱한 이벤트(Fat Event)'** 또는 **'봉투 패턴(Envelope Pattern)'**을 설계하는 것입니다.

RingBuffer의 Event 클래스를 '편지 봉투'로 만들고, 그 안에 필요한 모든 POJO 필드를 전부 다 때려 넣는 방식입니다.

```java
public enum CommandType {
    CREATE_ORDER, PROCESS_PAYMENT, DEDUCT_STOCK
}

// 뚱뚱한 이벤트 (RingBuffer의 단일 Event 클래스)
public class TransactionEvent {
    // 1. 현재 이 바구니에 담긴 데이터의 종류를 나타내는 식별자
    private CommandType type;

    // 2. 발생할 수 있는 모든 POJO의 저장 공간을 미리 할당해둠 (Zero GC를 위해)
    private final OrderData orderData = new OrderData();
    private final PaymentData paymentData = new PaymentData();
    private final StockData stockData = new StockData();

    // 주문 이벤트 발행 시 사용하는 Setter
    public void setOrderEvent(long orderId, String item) {
        this.type = CommandType.CREATE_ORDER;
        this.orderData.update(orderId, item); // 객체 생성이 아닌 내부 값만 갱신 (GC 방지)
    }

    // 결제 이벤트 발행 시 사용하는 Setter
    public void setPaymentEvent(long paymentId, long amount) {
        this.type = CommandType.PROCESS_PAYMENT;
        this.paymentData.update(paymentId, amount);
    }
    
    public CommandType getType() { return type; }
    // Getters...
}
```

#### 이 방식의 장점과 트레이드오프
* **Zero GC 보장:** 모든 서브 POJO들이 애플리케이션 시작 시점에 1회 할당되므로 가비지 컬렉션이 발생하지 않습니다.
* **빠른 라우팅:** 소비자는 `instanceof`를 쓰지 않고, 단순한 `switch(event.getType())` 문을 통해 자신에게 할당된 작업만 빠르게 필터링하여 처리할 수 있습니다.
* **메모리 낭비 (공간-시간 트레이드오프):** 주문 이벤트만 처리할 때는 결제 데이터 공간이 텅 빈 채로 메모리를 차지합니다. 이는 메모리(RAM)를 낭비하는 것처럼 보이지만, 백엔드 서버 환경에서 객체 몇 개 분량의 메모리를 희생하여 CPU 캐시 히트율과 GC 억제라는 **압도적인 연산 속도(시간)를 얻어내는 철저히 계산된 낭비**입니다.

### 요약
직접 선언하신 여러 POJO를 다형성으로 묶어 하나의 RingBuffer에 흘려보내는 것은 프레임워크의 존재 이유를 무너뜨립니다.

다양한 데이터를 하나의 파이프라인에서 처리해야 한다면, **모든 POJO를 품을 수 있는 거대한 단일 Event (봉투 객체)를 정의**하여 사용하는 것이 저지연/고성능 아키텍처의 정석입니다. 도메인이 물리적으로 완전히 분리되어 있다면 차라리 RingBuffer(Disruptor 인스턴스) 자체를 2개로 나누어 운영하는 것이 낫습니다.

 


 

여기서 Sequencer도 들여다 보도록 하겠다.

 

Sequencer.java

/*
 * Copyright 2012 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor;

/**
 * Coordinates claiming sequences for access to a data structure while tracking dependent {@link Sequence}s
 */
public interface Sequencer extends Cursored, Sequenced
{
    /**
     * Set to -1 as sequence starting point
     */
    long INITIAL_CURSOR_VALUE = -1L;

    /**
     * Claim a specific sequence.  Only used if initialising the ring buffer to
     * a specific value.
     *
     * @param sequence The sequence to initialise too.
     */
    void claim(long sequence);

    /**
     * Confirms if a sequence is published and the event is available for use; non-blocking.
     *
     * @param sequence of the buffer to check
     * @return true if the sequence is available for use, false if not
     */
    boolean isAvailable(long sequence);

    /**
     * Add the specified gating sequences to this instance of the Disruptor.  They will
     * safely and atomically added to the list of gating sequences.
     *
     * @param gatingSequences The sequences to add.
     */
    void addGatingSequences(Sequence... gatingSequences);

    /**
     * Remove the specified sequence from this sequencer.
     *
     * @param sequence to be removed.
     * @return <code>true</code> if this sequence was found, <code>false</code> otherwise.
     */
    boolean removeGatingSequence(Sequence sequence);

    /**
     * Create a new SequenceBarrier to be used by an EventProcessor to track which messages
     * are available to be read from the ring buffer given a list of sequences to track.
     *
     * @param sequencesToTrack All of the sequences that the newly constructed barrier will wait on.
     * @return A sequence barrier that will track the specified sequences.
     * @see SequenceBarrier
     */
    SequenceBarrier newBarrier(Sequence... sequencesToTrack);

    /**
     * Get the minimum sequence value from all of the gating sequences
     * added to this ringBuffer.
     *
     * @return The minimum gating sequence or the cursor sequence if
     * no sequences have been added.
     */
    long getMinimumSequence();

    /**
     * Get the highest sequence number that can be safely read from the ring buffer.  Depending
     * on the implementation of the Sequencer this call may need to scan a number of values
     * in the Sequencer.  The scan will range from nextSequence to availableSequence.  If
     * there are no available values <code>&gt;= nextSequence</code> the return value will be
     * <code>nextSequence - 1</code>.  To work correctly a consumer should pass a value that
     * is 1 higher than the last sequence that was successfully processed.
     *
     * @param nextSequence      The sequence to start scanning from.
     * @param availableSequence The sequence to scan to.
     * @return The highest value that can be safely read, will be at least <code>nextSequence - 1</code>.
     */
    long getHighestPublishedSequence(long nextSequence, long availableSequence);

    /**
     * Creates an event poller from this sequencer
     *
     * @param provider from which events are drawn
     * @param gatingSequences sequences to be gated on
     * @param <T> the type of the event
     * @return the event poller
     */
    <T> EventPoller<T> newPoller(DataProvider<T> provider, Sequence... gatingSequences);
}

 

 

Sequencer.java는 인터페이스인데 구현체를 다시 보면 다음과 같다.

 

SingleProducerSequencer보다는 MultiProducerSequencer를 보도록 하겠다.

 

코드를 보면 우리가 상식적으로 queue를 구현하면서 볼 수 있는 있음직한 메소드들이 있다. 

 

capacity측정, 사용 가능한지 여부(isAvailable), publish시에 signalAllWhenBlocking()가 있다.

 

WaitStrategy 인터페이스에 정의된 signalAllWhenBlocking() 메서드는 이름 그대로 "현재 대기(Blocking) 상태에 빠져서 잠들어 있는 소비자(Consumer) 스레드들에게 새로운 데이터가 들어왔으니 일어나라고 신호(Signal)를 보내는 역할"을 한다.

 

BlockingWaitStrategy.java

public final class BlockingWaitStrategy implements WaitStrategy
{
    private final Object mutex = new Object();

    @Override
    public long waitFor(final long sequence, final Sequence cursorSequence, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence)
        {
            synchronized (mutex)
            {
                while (cursorSequence.get() < sequence)
                {
                    barrier.checkAlert();
                    mutex.wait();
                }
            }
        }

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
            Thread.onSpinWait();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
        synchronized (mutex)
        {
            mutex.notifyAll();
        }
    }

    @Override
    public String toString()
    {
        return "BlockingWaitStrategy{" +
            "mutex=" + mutex +
            '}';
    }
}

 

BusySpinWaitStrategy.java

public final class BusySpinWaitStrategy implements WaitStrategy
{
    @Override
    public long waitFor(
        final long sequence, final Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
            Thread.onSpinWait();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
    }
}

 

정리하면 다음과 같다.

 

  • BlockingWaitStrategy: "데이터 올 때까지 잘 테니까 도착하면 깨워줘." (CPU 자원 절약, 응답 속도 느림) -> 그래서 깨워주는 코드(notifyAll)가 필요함.
  • BusySpinWaitStrategy: "잠 안 자고 계속 쳐다보고 있을 테니까 넌 일 끝나면 그냥 가." (응답 속도 극강, CPU 코어 1개 낭비) -> 그래서 깨워주는 코드(알람)가 불필요함.

 

 

이전에 공식문서에서 Busy Spin을 도입해서 lock같은 별도의 작업이 없게 함으로 속도를 최대로 끌어올렸다는 글을 기억한다. 

 

그런데 다른 WaitStrategy가 많은데 그것은 사용처에 따라서 적절히 선택하면 된다고 한다.

 

다음은 필자가 Gemini와 대화한 내용인데, 궁금하면 열어보길 추천한다.

 

더보기
더보기

 

"스핀(BusySpin)이 그렇게 빠르다면 도대체 왜 굳이 컨텍스트 스위칭 비용까지 지불하면서 느린 Blocking 전략을 만들어 두었을까?" 

아키텍처를 설계할 때 반드시 짚고 넘어가야 하는 아주 핵심적인 의문입니다. 결론부터 말씀드리면, **모든 시스템이 극단적인 '초저지연(Microsecond 단위)'을 요구하지 않으며, 현실의 하드웨어 인프라 환경과 운용 비용의 한계 때문**입니다.

실무 시스템 설계 관점에서 `BlockingWaitStrategy`가 반드시 필요한 이유를 4가지로 나누어 설명해 드리겠습니다.

### 1. 가상화된 클라우드 환경 (AWS, GCP 등)의 한계
현대의 백엔드 시스템은 대부분 AWS EC2나 컨테이너 환경에서 구동됩니다. 이 환경에서 제공되는 CPU는 물리적 코어가 아니라 논리적으로 쪼개진 **가상 코어(vCPU)**입니다.
* 가상화 환경에서 특정 스레드가 `BusySpin`으로 100% 자원을 점유하며 무한 루프를 돌면, 하이퍼바이저(Hypervisor) 시스템에 엄청난 부하를 줍니다.
* 심지어 다른 스레드나 프로세스가 쓸 CPU 시간마저 빼앗아 전체 시스템의 성능이 겉잡을 수 없이 붕괴되는 현상(CPU Starvation)이 발생할 수 있습니다.
* 클라우드 환경에서는 CPU 코어를 특정 스레드에 독점적으로 할당(Core Pinning)하기가 매우 까다롭기 때문에, 스레드를 깔끔하게 재우는(Sleep) Blocking 전략이 훨씬 안정적입니다.

### 2. CPU 자원 효율성과 비용
서버 하나에 Disruptor 파이프라인 하나만 띄워놓고 쓰는 경우는 드뭅니다. 톰캣(Tomcat) 스레드, 데이터베이스 커넥션 풀, 모니터링 에이전트 등 수많은 프로세스가 함께 돌아갑니다.
* 만약 소비자(Consumer) 스레드가 3개인데 전부 `BusySpin`을 쓴다면, 서버의 CPU 코어 3개는 24시간 내내 사용률 100%를 찍게 됩니다.
* 트래픽이 없는 새벽 시간대에도 코어를 태우고 있다면 이는 엄청난 전기세와 클라우드 인프라 비용의 낭비로 직결됩니다. 자원을 다른 작업에 양보하기 위해 Blocking이 필요합니다.

### 3. 하드웨어의 발열과 전력 소모 제한 (배터리 구동 환경)
백엔드 서버가 아니라, 한정된 배터리와 쿨링 시스템을 가진 환경(예: 자율주행 차량의 내장 컴퓨터, 로봇 제어 시스템, 엣지 디바이스 등)에 고성능 큐를 탑재한다고 가정해 보겠습니다.
* 락-프리(Lock-free) 무한 루프는 CPU에 극심한 연산 부하를 주어 엄청난 발열과 배터리 소모를 유발합니다.
* 센서 데이터를 처리하기 위해 높은 처리량(Throughput)이 필요하더라도, 발열 제어(Thermal Throttling)로 인해 시스템이 다운되는 것을 막으려면 스레드가 필요할 때만 깨어나서 일하는 Blocking 방식을 채택해야만 합니다.

### 4. 비즈니스 도메인의 요구사항 (처리량 vs 지연시간)
Disruptor를 쓴다고 해서 모든 작업이 1밀리초 안에 끝나야 하는 것은 아닙니다. 
* **비동기 로깅(Async Logging):** Log4j2가 내부적으로 Disruptor를 사용합니다. 로그 데이터를 디스크에 쓰는 작업은 10~20밀리초 늦게 저장되어도 시스템에 아무런 문제가 없습니다. 
* 지연 시간(Latency)보다 **한 번에 많은 양을 밀리지 않고 처리하는 것(Throughput)**이 더 중요한 백그라운드 작업의 경우, CPU 코어를 낭비하며 대기할 이유가 전혀 없습니다.

### 요약
`BlockingWaitStrategy`는 Disruptor가 제공하는 대기 전략 중 '가장 느린' 것은 맞습니다. 하지만 이는 어디까지나 **Disruptor 생태계 내부에서의 비교일 뿐**입니다.

Java의 기본 락 큐인 `ArrayBlockingQueue` 등과 비교하면 `BlockingWaitStrategy`를 적용한 Disruptor조차도 메모리 레이아웃(캐시 라인 패딩)과 링 버퍼 구조 덕분에 압도적으로 빠릅니다. 따라서 **CPU 자원 낭비를 막고 서버의 안정성을 보장하면서도 일반적인 큐보다 훨씬 높은 성능을 얻기 위해** 실무에서 가장 기본적이고 범용적으로 선택되는 전략이 바로 `BlockingWaitStrategy`입니다.


 

3. Disruptor User Guide 따라하기

그러면 Disruptor User Guide를 따라서 직접 해보도록 하겠다.

 

시작 전에 lmax:disruptor:4.0.0에서는 dsl를 도입해서 편리하게 사용할 수 있다고 한다.

 

그 핵심이 되는  Disruptor class를 들여다보고 가자.

/*
 * Copyright 2011 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor.dsl;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BatchEventProcessorBuilder;
import com.lmax.disruptor.BatchRewindStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventHandlerIdentity;
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.EventTranslatorTwoArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RewindableEventHandler;
import com.lmax.disruptor.RewindableException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.util.Util;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * A DSL-style API for setting up the disruptor pattern around a ring buffer
 * (aka the Builder pattern).
 *
 * <p>A simple example of setting up the disruptor with two event handlers that
 * must process events in order:
 *
 * <pre>
 * <code>Disruptor&lt;MyEvent&gt; disruptor = new Disruptor&lt;MyEvent&gt;(MyEvent.FACTORY, 32, Executors.newCachedThreadPool());
 * EventHandler&lt;MyEvent&gt; handler1 = new EventHandler&lt;MyEvent&gt;() { ... };
 * EventHandler&lt;MyEvent&gt; handler2 = new EventHandler&lt;MyEvent&gt;() { ... };
 * disruptor.handleEventsWith(handler1);
 * disruptor.after(handler1).handleEventsWith(handler2);
 *
 * RingBuffer ringBuffer = disruptor.start();</code>
 * </pre>
 *
 * @param <T> the type of event used.
 */
public class Disruptor<T>
{
    private final RingBuffer<T> ringBuffer;
    private final ThreadFactory threadFactory;
    private final ConsumerRepository consumerRepository = new ConsumerRepository();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>();

    /**
     * Create a new Disruptor. Will default to {@link com.lmax.disruptor.BlockingWaitStrategy} and
     * {@link ProducerType}.MULTI
     *
     * @param eventFactory   the factory to create events in the ring buffer.
     * @param ringBufferSize the size of the ring buffer.
     * @param threadFactory  a {@link ThreadFactory} to create threads to for processors.
     */
    public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
    {
        this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), threadFactory);
    }

    /**
     * Create a new Disruptor.
     *
     * @param eventFactory   the factory to create events in the ring buffer.
     * @param ringBufferSize the size of the ring buffer, must be power of 2.
     * @param threadFactory  a {@link ThreadFactory} to create threads for processors.
     * @param producerType   the claim strategy to use for the ring buffer.
     * @param waitStrategy   the wait strategy to use for the ring buffer.
     */
    public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final ThreadFactory threadFactory,
            final ProducerType producerType,
            final WaitStrategy waitStrategy)
    {
        this(
            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
            threadFactory);
    }

    /**
     * Private constructor helper
     */
    private Disruptor(final RingBuffer<T> ringBuffer, final ThreadFactory threadFactory)
    {
        this.ringBuffer = ringBuffer;
        this.threadFactory = threadFactory;
    }

    /**
     * <p>Set up event handlers to handle events from the ring buffer. These handlers will process events
     * as soon as they become available, in parallel.</p>
     *
     * <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must
     * process events before handler <code>B</code>:</p>
     * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>
     *
     * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>
     *
     * @param handlers the event handlers that will process events.
     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
     */
    @SuppressWarnings("varargs")
    @SafeVarargs
    public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        return createEventProcessors(new Sequence[0], handlers);
    }

    /**
     * <p>Set up event handlers to handle events from the ring buffer. These handlers will process events
     * as soon as they become available, in parallel.</p>
     *
     * <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must
     * process events before handler <code>B</code>:</p>
     * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>
     *
     * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>
     *
     * @param batchRewindStrategy a {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}.
     * @param handlers            the rewindable event handlers that will process events.
     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
     */
    @SuppressWarnings("varargs")
    @SafeVarargs
    public final EventHandlerGroup<T> handleEventsWith(final BatchRewindStrategy batchRewindStrategy,
                                                       final RewindableEventHandler<? super T>... handlers)
    {
        return createEventProcessors(new Sequence[0], batchRewindStrategy, handlers);
    }

    /**
     * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will
     * automatically start these processors when {@link #start()} is called.</p>
     *
     * <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must
     * process events before handler <code>B</code>:</p>
     * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>
     *
     * <p>Since this is the start of the chain, the processor factories will always be passed an empty <code>Sequence</code>
     * array, so the factory isn't necessary in this case. This method is provided for consistency with
     * {@link EventHandlerGroup#handleEventsWith(EventProcessorFactory...)} and {@link EventHandlerGroup#then(EventProcessorFactory...)}
     * which do have barrier sequences to provide.</p>
     *
     * <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>
     *
     * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.
     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
     */
    @SafeVarargs
    public final EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)
    {
        final Sequence[] barrierSequences = new Sequence[0];
        return createEventProcessors(barrierSequences, eventProcessorFactories);
    }

    /**
     * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will
     * automatically start this processors when {@link #start()} is called.</p>
     *
     * <p>This method can be used as the start of a chain. For example if the processor <code>A</code> must
     * process events before handler <code>B</code>:</p>
     * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>
     *
     * @param processors the event processors that will process events.
     * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
     */
    public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors)
    {
        for (final EventProcessor processor : processors)
        {
            consumerRepository.add(processor);
        }

        final Sequence[] sequences = Util.getSequencesFor(processors);

        ringBuffer.addGatingSequences(sequences);

        return new EventHandlerGroup<>(this, consumerRepository, sequences);
    }


    /**
     * <p>Specify an exception handler to be used for any future event handlers.</p>
     *
     * <p>Note that only event handlers set up after calling this method will use the exception handler.</p>
     *
     * @param exceptionHandler the exception handler to use for any future {@link EventProcessor}.
     * @deprecated This method only applies to future event handlers. Use setDefaultExceptionHandler instead which applies to existing and new event handlers.
     */
    @Deprecated
    public void handleExceptionsWith(final ExceptionHandler<? super T> exceptionHandler)
    {
        this.exceptionHandler = exceptionHandler;
    }

    /**
     * <p>Specify an exception handler to be used for event handlers and worker pools created by this Disruptor.</p>
     *
     * <p>The exception handler will be used by existing and future event handlers and worker pools created by this Disruptor instance.</p>
     *
     * @param exceptionHandler the exception handler to use.
     */
    @SuppressWarnings("unchecked")
    public void setDefaultExceptionHandler(final ExceptionHandler<? super T> exceptionHandler)
    {
        checkNotStarted();
        if (!(this.exceptionHandler instanceof ExceptionHandlerWrapper))
        {
            throw new IllegalStateException("setDefaultExceptionHandler can not be used after handleExceptionsWith");
        }
        ((ExceptionHandlerWrapper<T>) this.exceptionHandler).switchTo(exceptionHandler);
    }

    /**
     * Override the default exception handler for a specific handler.
     * <pre>disruptorWizard.handleExceptionsIn(eventHandler).with(exceptionHandler);</pre>
     *
     * @param eventHandler the event handler to set a different exception handler for.
     * @return an ExceptionHandlerSetting dsl object - intended to be used by chaining the with method call.
     */
    public ExceptionHandlerSetting<T> handleExceptionsFor(final EventHandlerIdentity eventHandler)
    {
        return new ExceptionHandlerSetting<>(eventHandler, consumerRepository);
    }

    /**
     * <p>Create a group of event handlers to be used as a dependency.
     * For example if the handler <code>A</code> must process events before handler <code>B</code>:</p>
     *
     * <pre><code>dw.after(A).handleEventsWith(B);</code></pre>
     *
     * @param handlers the event handlers, previously set up with {@link #handleEventsWith(EventHandler[])},
     *                 that will form the barrier for subsequent handlers or processors.
     * @return an {@link EventHandlerGroup} that can be used to setup a dependency barrier over the specified event handlers.
     */
    public final EventHandlerGroup<T> after(final EventHandlerIdentity... handlers)
    {
        final Sequence[] sequences = new Sequence[handlers.length];
        for (int i = 0, handlersLength = handlers.length; i < handlersLength; i++)
        {
            sequences[i] = consumerRepository.getSequenceFor(handlers[i]);
        }

        return new EventHandlerGroup<>(this, consumerRepository, sequences);
    }

    /**
     * Create a group of event processors to be used as a dependency.
     *
     * @param processors the event processors, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventProcessor...)},
     *                   that will form the barrier for subsequent handlers or processors.
     * @return an {@link EventHandlerGroup} that can be used to setup a {@link SequenceBarrier} over the specified event processors.
     * @see #after(EventHandlerIdentity[])
     */
    public EventHandlerGroup<T> after(final EventProcessor... processors)
    {
        return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors));
    }

    /**
     * Publish an event to the ring buffer.
     *
     * @param eventTranslator the translator that will load data into the event.
     */
    public void publishEvent(final EventTranslator<T> eventTranslator)
    {
        ringBuffer.publishEvent(eventTranslator);
    }

    /**
     * Publish an event to the ring buffer.
     *
     * @param <A>             Class of the user supplied argument.
     * @param eventTranslator the translator that will load data into the event.
     * @param arg             A single argument to load into the event
     */
    public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, final A arg)
    {
        ringBuffer.publishEvent(eventTranslator, arg);
    }

    /**
     * Publish a batch of events to the ring buffer.
     *
     * @param <A>             Class of the user supplied argument.
     * @param eventTranslator the translator that will load data into the event.
     * @param arg             An array single arguments to load into the events. One Per event.
     */
    public <A> void publishEvents(final EventTranslatorOneArg<T, A> eventTranslator, final A[] arg)
    {
        ringBuffer.publishEvents(eventTranslator, arg);
    }

    /**
     * Publish an event to the ring buffer.
     *
     * @param <A>             Class of the user supplied argument.
     * @param <B>             Class of the user supplied argument.
     * @param eventTranslator the translator that will load data into the event.
     * @param arg0            The first argument to load into the event
     * @param arg1            The second argument to load into the event
     */
    public <A, B> void publishEvent(final EventTranslatorTwoArg<T, A, B> eventTranslator, final A arg0, final B arg1)
    {
        ringBuffer.publishEvent(eventTranslator, arg0, arg1);
    }

    /**
     * Publish an event to the ring buffer.
     *
     * @param eventTranslator the translator that will load data into the event.
     * @param <A>             Class of the user supplied argument.
     * @param <B>             Class of the user supplied argument.
     * @param <C>             Class of the user supplied argument.
     * @param arg0            The first argument to load into the event
     * @param arg1            The second argument to load into the event
     * @param arg2            The third argument to load into the event
     */
    public <A, B, C> void publishEvent(final EventTranslatorThreeArg<T, A, B, C> eventTranslator, final A arg0, final B arg1, final C arg2)
    {
        ringBuffer.publishEvent(eventTranslator, arg0, arg1, arg2);
    }

    /**
     * <p>Starts the event processors and returns the fully configured ring buffer.</p>
     *
     * <p>The ring buffer is set up to prevent overwriting any entry that is yet to
     * be processed by the slowest event processor.</p>
     *
     * <p>This method must only be called once after all event processors have been added.</p>
     *
     * @return the configured ring buffer.
     */
    public RingBuffer<T> start()
    {
        checkOnlyStartedOnce();
        consumerRepository.startAll(threadFactory);

        return ringBuffer;
    }

    /**
     * Calls {@link com.lmax.disruptor.EventProcessor#halt()} on all of the event processors created via this disruptor.
     */
    public void halt()
    {
        consumerRepository.haltAll();
    }

    /**
     * <p>Waits until all events currently in the disruptor have been processed by all event processors
     * and then halts the processors.  It is critical that publishing to the ring buffer has stopped
     * before calling this method, otherwise it may never return.</p>
     *
     * <p>This method will not shutdown the executor, nor will it await the final termination of the
     * processor threads.</p>
     */
    public void shutdown()
    {
        try
        {
            shutdown(-1, TimeUnit.MILLISECONDS);
        }
        catch (final TimeoutException e)
        {
            exceptionHandler.handleOnShutdownException(e);
        }
    }

    /**
     * <p>Waits until all events currently in the disruptor have been processed by all event processors
     * and then halts the processors.</p>
     *
     * <p>This method will not shutdown the executor, nor will it await the final termination of the
     * processor threads.</p>
     *
     * @param timeout  the amount of time to wait for all events to be processed. <code>-1</code> will give an infinite timeout
     * @param timeUnit the unit the timeOut is specified in
     * @throws TimeoutException if a timeout occurs before shutdown completes.
     */
    public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException
    {
        final long timeOutAt = System.nanoTime() + timeUnit.toNanos(timeout);
        while (hasBacklog())
        {
            if (timeout >= 0 && System.nanoTime() > timeOutAt)
            {
                throw TimeoutException.INSTANCE;
            }
            // Busy spin
        }
        halt();
    }

    /**
     * The {@link RingBuffer} used by this Disruptor.  This is useful for creating custom
     * event processors if the behaviour of {@link BatchEventProcessor} is not suitable.
     *
     * @return the ring buffer used by this Disruptor.
     */
    public RingBuffer<T> getRingBuffer()
    {
        return ringBuffer;
    }

    /**
     * Get the value of the cursor indicating the published sequence.
     *
     * @return value of the cursor for events that have been published.
     */
    public long getCursor()
    {
        return ringBuffer.getCursor();
    }

    /**
     * The capacity of the data structure to hold entries.
     *
     * @return the size of the RingBuffer.
     * @see com.lmax.disruptor.Sequencer#getBufferSize()
     */
    public long getBufferSize()
    {
        return ringBuffer.getBufferSize();
    }

    /**
     * Get the event for a given sequence in the RingBuffer.
     *
     * @param sequence for the event.
     * @return event for the sequence.
     * @see RingBuffer#get(long)
     */
    public T get(final long sequence)
    {
        return ringBuffer.get(sequence);
    }

    /**
     * Get the {@link SequenceBarrier} used by a specific handler. Note that the {@link SequenceBarrier}
     * may be shared by multiple event handlers.
     *
     * @param handler the handler to get the barrier for.
     * @return the SequenceBarrier used by <i>handler</i>.
     */
    public SequenceBarrier getBarrierFor(final EventHandlerIdentity handler)
    {
        return consumerRepository.getBarrierFor(handler);
    }

    /**
     * Gets the sequence value for the specified event handlers.
     *
     * @param handler eventHandler to get the sequence for.
     * @return eventHandler's sequence
     */
    public long getSequenceValueFor(final EventHandlerIdentity handler)
    {
        return consumerRepository.getSequenceFor(handler).get();
    }

    /**
     * Confirms if all messages have been consumed by all event processors
     */
    private boolean hasBacklog()
    {
        final long cursor = ringBuffer.getCursor();

        return consumerRepository.hasBacklog(cursor, false);
    }

    /**
     * Checks if disruptor has been started
     *
     * @return true when start has been called on this instance; otherwise false
     */
    public boolean hasStarted()
    {
        return started.get();
    }

    EventHandlerGroup<T> createEventProcessors(
            final Sequence[] barrierSequences,
            final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();

        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];

            final BatchEventProcessor<T> batchEventProcessor =
                    new BatchEventProcessorBuilder().build(ringBuffer, barrier, eventHandler);

            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }

            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            processorSequences[i] = batchEventProcessor.getSequence();
        }

        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
    }

    EventHandlerGroup<T> createEventProcessors(
            final Sequence[] barrierSequences,
            final BatchRewindStrategy batchRewindStrategy,
            final RewindableEventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();

        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final RewindableEventHandler<? super T> eventHandler = eventHandlers[i];

            final BatchEventProcessor<T> batchEventProcessor =
                    new BatchEventProcessorBuilder().build(ringBuffer, barrier, eventHandler, batchRewindStrategy);

            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }

            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            processorSequences[i] = batchEventProcessor.getSequence();
        }

        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
    }

    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
    {
        if (processorSequences.length > 0)
        {
            ringBuffer.addGatingSequences(processorSequences);
            for (final Sequence barrierSequence : barrierSequences)
            {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }

    EventHandlerGroup<T> createEventProcessors(
            final Sequence[] barrierSequences, final EventProcessorFactory<T>[] processorFactories)
    {
        final EventProcessor[] eventProcessors = new EventProcessor[processorFactories.length];
        for (int i = 0; i < processorFactories.length; i++)
        {
            eventProcessors[i] = processorFactories[i].createEventProcessor(ringBuffer, barrierSequences);
        }

        return handleEventsWith(eventProcessors);
    }

    private void checkNotStarted()
    {
        if (started.get())
        {
            throw new IllegalStateException("All event handlers must be added before calling starts.");
        }
    }

    private void checkOnlyStartedOnce()
    {
        if (!started.compareAndSet(false, true))
        {
            throw new IllegalStateException("Disruptor.start() must only be called once.");
        }
    }

    @Override
    public String toString()
    {
        return "Disruptor{" +
                "ringBuffer=" + ringBuffer +
                ", started=" + started +
                ", threadFactory=" + threadFactory +
                '}';
    }
}

 

이제 dsl 패키지에 선언된 Disruptor.java를 사용해서 이를 적용하면 된다. 

 

순서를 정리하면 다음과 같다. 

 

1. 데이터를 전달할 Event를 정의한다. 

2. Disruptor가 미리 이벤트를 할당할 수 있도록 하려면 이벤트 생성을 수행할 EventFactory가 필요하니 이를 정의한다.

3. Event를 처리할 consumer를 EventHandler를 통해 생성한다.

4. 이를 시연할 main함수를 정의한다.

 

LongEvent.java

public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }

    @Override
    public String toString()
    {
        return "LongEvent{" + "value=" + value + '}';
    }
}

 

LongEventFactory.java

public class LongEventFactory implements EventFactory<LongEvent>
{
    @Override
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

 

LongEventHandler.java

public class LongEventHandler implements EventHandler<LongEvent>
{
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("Event: " + event);
    }
}

 

Main.java

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

public class Main {
    public static void main(String[] args) throws Exception
    {
        // RingBuffer의 크기를 지정합니다.
        // 고속 비트 마스킹 연산을 위해 반드시 2의 거듭제곱(2^n) 형태로 설정해야 합니다.
        int bufferSize = 1024;

        // 이벤트를 관리하고 스레드 생명주기를 제어하는 Disruptor 래퍼(Wrapper) 객체를 생성합니다.
        // 1. LongEvent::new : 구동 시점에 RingBuffer 배열을 꽉 채울 '빈 바구니(POJO)'를 생성하는 팩토리 메서드입니다.
        // 2. bufferSize : 1024개의 객체가 연속된 메모리 공간에 Pre-allocation(사전 할당) 됩니다.
        // 3. DaemonThreadFactory : 소비자(Consumer)가 메인 스레드 종료 시 함께 종료되는 데몬 스레드로 동작하게 합니다.
        Disruptor<LongEvent> disruptor =
                new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // 소비자(Consumer/EventHandler)를 파이프라인에 등록합니다.
        // 생산자가 메모리 배리어를 통과시켜(Publish) 가시성이 확보된 이벤트만 이 람다식으로 전달됩니다.
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));

        // 설정된 백그라운드 소비자 스레드들을 풀(Pool)에서 깨워 실행하고, RingBuffer의 상태를 준비 완료로 만듭니다.
        disruptor.start();

        // 데이터 생산(Publish)을 위해 Disruptor 내부에 생성된 RingBuffer 인스턴스의 참조를 가져옵니다.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        // 8바이트 크기의 ByteBuffer를 할당합니다.
        // 이는 이벤트를 발행할 때마다 Long 객체나 배열을 새로 생성(new)하여 GC를 유발하는 것을 막기 위해, 재사용할 임시 메모리 공간을 만든 것입니다.
        ByteBuffer bb = ByteBuffer.allocate(8);

        // 생산자(Producer) 스레드의 역할을 수행하는 무한 루프입니다.
        for (long l = 0; true; l++)
        {
            // 재사용 중인 버퍼의 0번 위치에 현재 루프의 값(l)을 덮어씁니다.
            // 새로운 공간(포스트잇)을 만들지 않고 딱 하나 있는 화이트보드를 지웠다 썼다 하며 무한히 재사용하기 위해서 입니다.
            bb.putLong(0, l);

            // 앞서 설명한 3단계 커밋(Claim -> Write -> Publish)을 한 번에 수행하는 고성능 API입니다.
            // EventTranslatorOneArg를 익명 람다식으로 전달했습니다.
            // 내부 동작: 빈 Sequence를 채번(next)하고 -> 해당 자리의 Event 객체에 버퍼 값을 덮어쓰고(set) -> 최종 커밋(publish)하여 소비자에게 알립니다.
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
			/*
             * 외부 변수 참조가 없으므로, JVM은 이 람다식이 상태를 가지지 않는다는 것을 압니다. 
             * 따라서 이 람다 객체를 애플리케이션 메모리에 딱 1번만 싱글톤처럼 만들어 두고 무한히 재사용합니다. 
             * 객체 생성이 전혀 일어나지 않으므로 GC 압박(GC Pressure)이 완벽하게 0(Zero)이 됩니다.
             */

            // 너무 빠른 속도로 이벤트가 발행되는 것을 조절하기 위해 1초 대기합니다. (실제 운영 환경에서는 제거되는 코드입니다)
            Thread.sleep(1000);
        }
    }
}

 

그런데 여기서 User Guide 에서 제공한 문제를 보는데, Lambda Capturing 에 대한 이슈가 있다.

 

솔직히 쓸 줄만 알았고 코드 깔끔해져서 좋다고 하는게 전부였는데 이는 나중에 다시 Java 섹션에 Lambda 에 대해 정리하도록 하겠다.

 


사실 이 이후로 글을 더 정리할까 했다.

 

하지만 claude 로 그래서, disruptor관련해서 이걸 어떻게 logging 에 적용하면 좋을까 했는데 바로 10초만에  코드를 작성해 주었다.

 

핵심 원리에 대해서는 스스로 찾아보고 파악하는데 오랜 시간이 걸렷다면, 적용은 그렇게 오래 걸리지 않는다는 점이다.

 

개인적으로는 원리는 잘 파악을 하되, boilder plate 는 AI가 작성하는 속도가 압도적으로 빠르니 방향을 잡은 후 선작업 후분석을 하면 업무속도가 엄청나게 빨라질 것으로 판단된다.

 

LMAX Disruptor에 대한 이해는 이렇게 마무리하려고 한다.

 

적용은? 그냥 AI 쓰면서 쓰는거지 필자가 무언가를 새롭게 만든다 하면은 claude 의 도움을 받아서 작성 및 검증을 진행할 것 같다.

 

객관적으로 말하면 이걸 밑바닥부터 내가 직접 구현할 일이 있을까? 하는걸 생각하면 없을것이다. 

 

결론적으로 이런게 있구나~ 하고 이러한 상황에서 이렇게 문제를 해결하기 위해 이런것이 나왓구나~ 하는 정도만 알아도 사실... 엄청 성공한 것으로 생각한다. 아무 생각 안하고 코드를 질러버리는 경우가 허다하기 때문이다.

 

출처

https://mvnrepository.com/artifact/com.lmax/disruptor/4.0.0

https://mechanical-sympathy.blogspot.com/2011/08/false-sharing-java-7.html

https://dev.to/axelncho/using-lmax-disruptor-to-build-a-high-performance-in-memory-event-broker-in-java-6i

https://stackoverflow.com/questions/54429334/lambda-capturing-vs-non-capturing