Örnek - FlinkKafkaConsumer + FileSink
Şöyle yaparız. Burada Kafka'dan okunuyor ve bir HDFS sink'inine yazılıyor.
// Create a StreamExecutionEnvironmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Create a Kafka sourceFlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("topic-name",new SimpleStringSchema(),kafkaProperties);// Add the source to the environmentDataStream<String> stream = env.addSource(kafkaConsumer);// Process the dataDataStream<String> processedStream = stream.map(new ProcessingFunction());// Write the processed data to a Sink (e.g., HDFS)StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path("hdfs://output-path"), new SimpleStringEncoder<String>("UTF-8")).build();processedStream.addSink(sink);// Execute the Flink jobenv.execute("Flink Data Lakehouse Job");
Örnek - FlinkKafkaConsumer + MongoDBSink
Şöyle yaparız. Burada Kafka'dan okunuyor ve bir Mongo sink'inine yazılıyor.
// Sets up the execution environment, which is the main entry point
// to build Flink applications.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure the Redpanda source
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "redpanda-0:9092");
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");
// Create the Redpanda source connector
FlinkKafkaConsumer<String> kafkaConsumer =
new FlinkKafkaConsumer<>("financial-transactions",
new SimpleStringSchema(), kafkaProps);
// Add the Redpanda source to the execution environment
DataStream<String> transactionStream = env.addSource(kafkaConsumer);
// Configure the MongoDB sink
// Add the MongoDB sink to the transaction stream
transactionStream.addSink(new MongoDBSink());
// Execute the Flink job
env.execute("Fraud Detection App");
Mongo sink için şöyle yaparız.
public class MongoDBSink extends RichSinkFunction<String> {
private static final Logger LOG = LoggerFactory.getLogger(MongoDBSink.class);
private transient MongoCollection<Document> collection;
@Override
public void open(Configuration parameters) {
// Create the MongoDB client to establish connection
MongoClientSettings settings = MongoClientSettings.builder()
.applyToClusterSettings(builder ->
builder.hosts(Collections.singletonList(new ServerAddress("mongo", 27017))))
codecRegistry(createCodecRegistry())
.build();
MongoClient mongoClient = MongoClients.create(settings);
// Access the MongoDB database and collection
// At this stage, if the MongoDB database and collection do not exist,
// they would automatically be created
MongoDatabase database = mongoClient.getDatabase("finance");
collection = database.getCollection("financial_transactions");
}
@Override
public void invoke(String value, Context context) {
// Consume the event from the Redpanda topic
LOG.info("Consumed event : " + value);
Document transactionDocument = Document.parse(value);
LOG.info("transactionDocument is : "+ transactionDocument);
// Optionally you can add fraud detection logic as an additional exercise task
// from your end here
// ...
// Insert into MongoDB collection
collection.insertOne(transactionDocument);
}
@Override
public void close() {
// Clean up resources, if needed
}
private CodecRegistry createCodecRegistry() {
// The method createCodecRegistry is a helper method that is used to create a
// CodecRegistry object for MongoDB.
// In MongoDB, a CodecRegistry is responsible for encoding and decoding Java
// objects to
// BSON (Binary JSON) format, which is the native format used by MongoDB to store
// and retrieve data.
return CodecRegistries.fromRegistries(
MongoClientSettings.getDefaultCodecRegistry(),
CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true)
.build())
);
}
}Örnek
Şöyle yaparız
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaFlinkIntegration {
public static void main(String[] args) throws Exception {
// Setting up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// Kafka consumer configuration
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
// Creating a data stream from Kafka
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("test_topic",
new SimpleStringSchema(), properties));
// Processing the stream
stream.flatMap(new Splitter())
.print();
env.execute("Kafka Flink Integration Example");
}
}FlatMapFunction için şöyle yaparız
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
// Simple FlatMap function to split the data
public static class Splitter implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word: value.split(" ")) {
out.collect(word);
}
}
}Örnek -Avro için FlinkKafkaConsumer
Projeye Kafka ve Avro bağımlılıklarını eklemek gerekir. Şöyle yaparız
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
Şöyle yaparız
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<bootstrap.servers>");
properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("sasl.jaas.config", "<sasl.jaas.config>");
properties.setProperty("group.id", "<group id>");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("schema.registry.url", "<schema.registry.url>");
properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
properties.setProperty(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY,
TopicRecordNameStrategy.class.getName());
FlinkKafkaConsumer<MyRecord> kafkaConsumer = new FlinkKafkaConsumer<>(
"<topic>",
new AvroDeserializationSchema<>(MyRecord.class, new Schema.Parser()
.parse(MyRecord.SCHEMA$)),
properties
);
kafkaConsumer.setStartFromEarliest();
env.addSource(kafkaConsumer)
.map(record -> {
// TODO: Add custom logic here
return record;
})
.print();
env.execute("Flink Kafka Consumer");
Hiç yorum yok:
Yorum Gönder