FlinkKafkaPublisher を使用してストリームデータに書き込みます

1 行で使用します

以下のスニペットでは、ストリーム レイヤーに 1 行で公開する方法を示します。

Scala
Java
// give an iterator of PendingPartition`s
val pendingPartitions: Iterator[NewPartition] =
  Iterator.single(
    NewPartition("partition", "layer-id", NewPartition.ByteArrayData(Array(1.toByte))))

// publish partitions into streaming layer
FlinkKafkaPublisher(hrn, pendingPartitions)
// give an iterator of PendingPartition`s
Iterator<PendingPartition> pendingPartitions = getPendingPartitions();

// publish partitions into streaming layer
FlinkKafkaPublisher.apply(hrn, pendingPartitions);

注 : 使用状況の詳細

FlinkKafkaPublisher クラスは非標準の Flink ソリューションです。 ストリーム処理のパフォーマンスが低下します。 パーティションをストリーム レイヤーに書き込む場合は、一般的な方法を使用することをお勧めします writeEngine.publish()

複数回パブリッシュしてパフォーマンスを向上する必要がある場合に使用します

以下のスニペットでは、パフォーマンスを向上させてストリーム レイヤーに公開する方法を示します。 複数回パブリッシュし、終了時に終了する必要がある場所。

Scala
Java
new RichMapFunction[String, String]() {

  lazy val publisher = new FlinkKafkaPublisher(hrn)

  override def map(value: String): String = {

    // give an iterator of PendingPartition`s
    val pendingPartitions: Iterator[NewPartition] =
      Iterator.single(
        NewPartition("partition", "layer-id", NewPartition.ByteArrayData(Array(1.toByte))))

    // publish partitions into streaming layer
    publisher.publish(pendingPartitions)

    "some data"
  }

  override def close(): Unit = publisher.terminate()
}
class SomeMapFunction extends RichMapFunction<Partition, Tuple2<Partition, byte[]>>
    implements Serializable {
  private final HRN hrn;

  private transient FlinkKafkaPublisher publisher;

  public SomeMapFunction(HRN hrn) {
    this.hrn = hrn;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    // initialize publisher
    publisher = new FlinkKafkaPublisher(hrn);
  }

  @Override
  public void close() throws Exception {
    // close publisher
    publisher.terminate();
  }

  @Override
  public Tuple2<Partition, byte[]> map(Partition partition) throws Exception {
    byte[] data = "some data".getBytes();
    Iterator<PendingPartition> pendingPartitions = getPendingPartitions();

    // publish partitions into streaming layer
    FlinkKafkaPublisher.apply(hrn, pendingPartitions);

    return new Tuple2<Partition, byte[]>(partition, data);
  }
}

」に一致する結果は 件です

    」に一致する結果はありません