5 Aralık 2023 Salı

Apache Flink - FlinkKafkaConsumer Sınıfı

Örnek - FlinkKafkaConsumer + FileSink
Şöyle yaparız. Burada Kafka'dan okunuyor ve bir HDFS sink'inine yazılıyor.
// Create a StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create a Kafka source
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
    "topic-name", 
    new SimpleStringSchema(), 
    kafkaProperties
);

// Add the source to the environment
DataStream<String> stream = env.addSource(kafkaConsumer);

// Process the data 
DataStream<String> processedStream = stream.map(new ProcessingFunction());

// Write the processed data to a Sink (e.g., HDFS)
StreamingFileSink<String> sink = StreamingFileSink
    .forRowFormat(new Path("hdfs://output-path"), new SimpleStringEncoder<String>("UTF-8"))
    .build();

processedStream.addSink(sink);

// Execute the Flink job
env.execute("Flink Data Lakehouse Job");
Örnek - FlinkKafkaConsumer + MongoDBSink 
Şöyle yaparız. Burada Kafka'dan okunuyor ve bir Mongo sink'inine yazılıyor.
// Sets up the execution environment, which is the main entry point
// to build Flink applications.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure the Redpanda source
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "redpanda-0:9092");
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");

// Create the Redpanda source connector
FlinkKafkaConsumer<String> kafkaConsumer = 
  new FlinkKafkaConsumer<>("financial-transactions",
    new SimpleStringSchema(), kafkaProps);

// Add the Redpanda source to the execution environment
DataStream<String> transactionStream = env.addSource(kafkaConsumer);

// Configure the MongoDB sink
// Add the MongoDB sink to the transaction stream
transactionStream.addSink(new MongoDBSink());

// Execute the Flink job
env.execute("Fraud Detection App");
Mongo sink için şöyle yaparız
public class MongoDBSink extends RichSinkFunction<String> {
  private static final Logger LOG = LoggerFactory.getLogger(MongoDBSink.class);
  private transient MongoCollection<Document> collection;

  @Override
  public void open(Configuration parameters) {
    // Create the MongoDB client to establish connection
    MongoClientSettings settings = MongoClientSettings.builder()
       .applyToClusterSettings(builder ->
         builder.hosts(Collections.singletonList(new ServerAddress("mongo", 27017))))
         codecRegistry(createCodecRegistry())
         .build();

      MongoClient mongoClient = MongoClients.create(settings);

      // Access the MongoDB database and collection
      // At this stage, if the MongoDB database and collection do not exist,
      // they would automatically be created
      MongoDatabase database = mongoClient.getDatabase("finance");
      collection = database.getCollection("financial_transactions");
  }

  @Override
  public void invoke(String value, Context context) {
    // Consume the event from the Redpanda topic
    LOG.info("Consumed event : " + value);
    Document transactionDocument = Document.parse(value);
    LOG.info("transactionDocument is : "+ transactionDocument);
    // Optionally you can add fraud detection logic as an additional exercise task
    // from your end here
    // ...

    // Insert into MongoDB collection
    collection.insertOne(transactionDocument);
  }

  @Override
  public void close() {
    // Clean up resources, if needed
  }

  private CodecRegistry createCodecRegistry() {
    // The method createCodecRegistry is a helper method that is used to create a
    // CodecRegistry object for MongoDB.
    // In MongoDB, a CodecRegistry is responsible for encoding and decoding Java
    // objects to
    // BSON (Binary JSON) format, which is the native format used by MongoDB to store
    // and retrieve data.
    return CodecRegistries.fromRegistries(
      MongoClientSettings.getDefaultCodecRegistry(),
        CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true)
        .build())
      );
  }
}
Örnek
Şöyle yaparız
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaFlinkIntegration {
  public static void main(String[] args) throws Exception {
    // Setting up the streaming execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment
      .getExecutionEnvironment();

    // Kafka consumer configuration
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test");

    // Creating a data stream from Kafka
    DataStream<String> stream = env
      .addSource(new FlinkKafkaConsumer<>("test_topic",
        new SimpleStringSchema(), properties));

     // Processing the stream
     stream.flatMap(new Splitter())
       .print();

    env.execute("Kafka Flink Integration Example");
  }
}
FlatMapFunction için şöyle yaparız
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;


// Simple FlatMap function to split the data
public static class Splitter implements FlatMapFunction<String, String> {
  @Override
  public void flatMap(String value, Collector<String> out) throws Exception {
    for (String word: value.split(" ")) {
      out.collect(word);
    }
  }
}
Örnek -Avro için FlinkKafkaConsumer 
Projeye Kafka ve Avro bağımlılıklarını eklemek gerekir. Şöyle yaparız
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-streams-avro-serde</artifactId>
  <version>${confluent.version}</version>
</dependency>
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-streams-schema-registry-client</artifactId>
  <version>${confluent.version}</version>
</dependency>
Şöyle yaparız
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<bootstrap.servers>");
properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("sasl.jaas.config", "<sasl.jaas.config>");
properties.setProperty("group.id", "<group id>");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("schema.registry.url", "<schema.registry.url>");
properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
properties.setProperty(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY,
  TopicRecordNameStrategy.class.getName());
FlinkKafkaConsumer<MyRecord> kafkaConsumer = new FlinkKafkaConsumer<>(
  "<topic>",
  new AvroDeserializationSchema<>(MyRecord.class, new Schema.Parser()
    .parse(MyRecord.SCHEMA$)),
  properties
);
    
kafkaConsumer.setStartFromEarliest();
env.addSource(kafkaConsumer)
  .map(record -> {
    // TODO: Add custom logic here
    return record;
  })
  .print();
    
env.execute("Flink Kafka Consumer");


Hiç yorum yok:

Yorum Gönder