1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| public class ProducerDemo { public static void main(String[] args) { boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("client.id", "DemoProducer");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<>(props);
String topic = "test";
int messageNo = 1;
while (true) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis();
if (isAsync) {
producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { try {
producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.printf("Send message: (%d, %s)\n", messageNo, messageStr); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } ++messageNo; } } }
class DemoCallBack implements Callback { private final long startTime; private final int key; private final String message;
public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; }
@Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n", key, message, metadata.partition(), metadata.offset(), elapsedTime); } else { exception.printStackTrace(); } } }
|