0
イムカフカと協力最初の数秒でメッセージを生成するために遅く、私はそのようなプロデューサー行わ:あなたはそれが非常に簡単である見ることができるようカフカは
synchronized (obj) {
while (true){
long start = Instant.now().toEpochMilli();
for (int i=0; i< NUM_MSG_SEC ; i++)
{
PriceStreamingData data = PriceStreamingData.newBuilder()
.setUser(getRequest().getUser())
.setSecurity(getRequest().getSecurity())
.setTimestamp(Instant.now().toEpochMilli())
.setPrice(new Random().nextDouble()*200)
.build();
record = new ProducerRecord<>(topic, keyBuilder.build(data),
data);
producer.send(record,new Callback(){
@Override
public void onCompletion(RecordMetadata arg0, Exception arg1) {
counter.incrementAndGet();
if(arg1 != null){
arg1.printStackTrace();
}
}
});
}
long diffCiclo = Instant.now().toEpochMilli() - start;
long diff = Instant.now().toEpochMilli() - startTime;
System.out.println("Number of sent: " + counter.get() +
" Millisecond:" + (diff) + " - NumberOfSent/Diff(K): " + counter.get()/diff);
try {
if(diffCiclo >= 1000){
System.out.println("over 1 second: " + diffCiclo);
}
else {
obj.wait(1000 - diffCiclo);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
、それだけで新しいメッセージを作成し、それを送信します。 私がログ表示された場合:最初の10秒で
NumberOfSent/Diff(K)
を、それは非常に悪い行うだけ
30k per second
60秒後に私は
180k per second
理由がありますか?どのようにしてすでに180kにプロセスを開始することができますか?
私カフカのプロデューサーの構成はFollwing
Async producer (but also with sync producer the situation dose not change)
ACKS_CONFIG = 0
BATCH_SIZE_CONFIG = 20000
COMPRESSION_TYPE_CONFIG = none
LINGER_MS_CONFIG = 0
細部である:
NUM_MSG_SEC is set to 200000 or bigger number
遅延を引き起こす 'obj'にロックがありますか? 'isRunning()'がtrueを返すのはいつですか? –
他に何もobjをロックしていない、遅れを引き起こすものは何もないと思います。遅延はどこかではないのですが、コードは非常にシンプルですが、私はそれがkafka設定の周りにあると思います。(isRunningは常にtrueです) –
おそらく 'synchronized(obj)'の直後にlog文を追加して、コードが実際に実行された時期を判断してください。おそらくあなたのコードが実行される前に何が起こっているかを見るためにデバッグでも実行されます。 –