getting start

2017-01-11 15:24:19来源:oschina作者:锦语冰人点击


https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started



1 event 生产 消费


package disruptor.start;
/**
* https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
*
* 1
*
* one that will pass a single long value from a producer to a consumer,
* where the consumer will simply print out the value.
* Firstly we will define the Event that will carry the data.
*/
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
}
package disruptor.start;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
/**
*
* 2
*
* In order to allow the Disruptor to preallocate these events for us,
* we need to an EventFactory that will perform the construction
*
* Called by the {@link RingBuffer} to pre-populate all the events to fill the RingBuffer.
*/
public class LongEventFactory implements EventFactory {@Override
public LongEvent newInstance() {
return new LongEvent();
}
}package disruptor.start;import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
/**
* 3
*
* Once we have the event defined we need to
* create a consumer that will handle these events
*
* EventHandler - Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
*
*/
public class LongEventHandler implements EventHandler {
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param eventpublished to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println(Thread.currentThread().getName() +"Event: " + event);
}
}
package disruptor.start;
import java.nio.ByteBuffer;
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.RingBuffer;
/**
* 4
* need a source for these events, for the sake of an example I am going to assume that
* the data is coming from some sort of I/O device, e.g. network or file in the form of a ByteBuffer.
*/
public class LongEventProducer {
/**
* Ring based store of reusable entries containing the data representing
* an event being exchanged between event producer and {@link EventProcessor}s.
*/
private final RingBuffer ringBuffer;
public LongEventProducer(RingBuffer ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* What becomes immediately obvious is that event publication becomes more invovled that using a simple queue.
* This is due to the desire for event preallocation.
* In requires (at the lowest level) a 2-phase approach to message publication,
* i.e. claim the slot in the ring buffer then publish the available data.
* It is also necessary to wrap publication in a try/finally block.
* If we claim a slot in the Ring Buffer (calling RingBuffer.next()) then we must publish this sequence.
* Failing to do can result in corruption of the state of the Disruptor.
* Specially, in the multi-producer case this will result in the consumers stalling and being unable to recover without a restart.
* @param bb
*/
public void onData(ByteBuffer bb) {
long sequence = ringBuffer.next();// Grab the next sequence
try {
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0));// Fill with data
} finally {
ringBuffer.publish(sequence);
}
}
}

2 高级用法


package disruptor.start.v3;
import java.nio.ByteBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import disruptor.start.LongEvent;
/**
* With version 3.0 of the Disruptor a richer Lambda-style API was added to help developers
* by encapsulating this complexity within the Ring Buffer, s
* o post-3.0 the preferred approach for publishing messages is via the
* Event Publisher/Event Translator portion of the API. E.g.
* Date: 2017-01-10
*
* @author duyu3
*/
public class LongEventProducerWithTranslator {
private final RingBuffer ringBuffer;
public LongEventProducerWithTranslator(RingBuffer ringBuffer) {
this.ringBuffer = ringBuffer;
}
//Implementations translate another data representations into events claimed from the {@link RingBuffer}
private static final EventTranslatorOneArg TRANSLATOR =
new EventTranslatorOneArg() {
public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
event.set(bb.getLong(0));
}
};
public void onData(ByteBuffer bb) {
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}

3 main


package disruptor.start.main;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import disruptor.start.LongEvent;
import disruptor.start.LongEventFactory;
import disruptor.start.LongEventHandler;
import disruptor.start.LongEventProducer;
public class LongEventMain {
public static void main(String[] args) throws Exception {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor disruptor = new Disruptor<>(factory, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
producer.onData(bb);
Thread.sleep(1000);
}
}
}package disruptor.start.main;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import start.LongEvent;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* Note how a number of the classes (e.g. handler, translator) are no longer required.
* Also note how the lambda used for publishEvent() only refers to the parameters that are passed in.
*/
public class LongEventMainWithJava8 {
public static void main(String[] args) throws Exception {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
// 1.
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
// 2.
/**
* This would create a capturing lambda,
* meaning that it would need to instantiate an object to hold the ByteBuffer bb variable
* as it passes the lambda through to the publishEvent() call.
* This will create additional (unnecessary) garbage,
* so the call that passes the argument through to the lambda should be preferred if low GC pressure is a requirement.
*/
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
}
}package start.main;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import start.LongEvent;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class LongEventMainGood {
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println(event);
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer) {
event.set(buffer.getLong(0));
}
public static void main(String[] args) throws Exception {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith(LongEventMainGood::handleEvent);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
ringBuffer.publishEvent(LongEventMainGood::translate, bb);
Thread.sleep(1000);
}
}
}

基本调优


Basic Tuning Options

Using the above approach will work functionally in the widest set of deployment scenarios. However, if you able to make certain assumptions about the hardware and software environment that the Disruptor will run in then you can take advantage of a number of tuning options to improve performance. There are 2 main options for tuning, single vs. multiple producers and alternative wait strategies.


Single vs. Multiple Producers

One of the best ways to improve performance in concurrect systems is to ahere to theSingle Writer Princple, this applies to the Disruptor. If you are in the situation where there will only ever be a single thread producing events into the Disruptor, then you can take advantage of this to gain additional performance.


public class LongEventMain
{
public static void main(String[] args) throws Exception
{
//.....
// Construct the Disruptor with a SingleProducerSequencer
Disruptor disruptor = new Disruptor(factory,
bufferSize,
ProducerType.SINGLE, // Single producer
new BlockingWaitStrategy(),
executor);
//.....
}
}

To give an indication of how much of a performance advantage can be achieved through this technique we can change the producer type in theOneToOne performance test. Tests run on i7 Sandy Bridge MacBook Air.


Multiple Producer
Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/secSingle Producer
Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/secAlternative Wait Strategies

The default wait strategy used by the Disruptor is the BlockingWaitStrategy. Internally the BlockingWaitStrategy uses a typical lock and condition variable to handle thread wake-up. The BlockingWaitStrategy is the slowest of the available wait strategies, but is the most conservative with the respect to CPU usage and will give the most consistent behaviour across the widest variety of deployment options. However, again knowledge of the deployed system can allow for additional performance.


SleepingWaitStrategy

Like the BlockingWaitStrategy the SleepingWaitStrategy it attempts to be conservative with CPU usage, by using a simple busy wait loop, but uses a call toLockSupport.parkNanos(1)in the middle of the loop. On a typical Linux system this will pause the thread for around 60µs. However it has the benefit that the producing thread does not need to take any action other increment the appropriate counter and does not require the cost of signalling a condition variable. However, the mean latency of moving the event between the producer and consumer threads will be higher. It works best in situations where low latency is not required, but a low impact on the producing thread is desired. A common use case is for asynchronous logging.


YieldingWaitStrategy

The YieldingWaitStrategy is one of 2 Wait Strategies that can be use in low latency systems, where there is the option to burn CPU cycles with the goal of improving latency. The YieldingWaitStrategy will busy spin waiting for the sequence to increment to the appropriate value. Inside the body of the loopThread.yield()will be called allowing other queued threads to run. This is the recommended wait strategy when need very high performance and the number of Event Handler threads is less than the total number of logical cores, e.g. you have hyper-threading enabled.


BusySpinWaitStrategy

The BusySpinWaitStrategy is the highest performing Wait Strategy, but puts the highest constraints on the deployment environment. This wait strategy should only be used if the number of Event Handler threads is smaller than the number ofphysicalcores on the box. E.g. hyper-threading should be disabled.

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台