Örnek - FlinkKafkaConsumer + FileSink
Şöyle yaparız. Burada Kafka'dan okunuyor ve bir HDFS sink'inine yazılıyor.
// Create a StreamExecutionEnvironmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Create a Kafka sourceFlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("topic-name",new SimpleStringSchema(),kafkaProperties);// Add the source to the environmentDataStream<String> stream = env.addSource(kafkaConsumer);// Process the dataDataStream<String> processedStream = stream.map(new ProcessingFunction());// Write the processed data to a Sink (e.g., HDFS)StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path("hdfs://output-path"), new SimpleStringEncoder<String>("UTF-8")).build();processedStream.addSink(sink);// Execute the Flink jobenv.execute("Flink Data Lakehouse Job");
Örnek - FlinkKafkaConsumer + MongoDBSink
Şöyle yaparız. Burada Kafka'dan okunuyor ve bir Mongo sink'inine yazılıyor.
// Sets up the execution environment, which is the main entry point // to build Flink applications. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Configure the Redpanda source Properties kafkaProps = new Properties(); kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "redpanda-0:9092"); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group"); // Create the Redpanda source connector FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("financial-transactions", new SimpleStringSchema(), kafkaProps); // Add the Redpanda source to the execution environment DataStream<String> transactionStream = env.addSource(kafkaConsumer); // Configure the MongoDB sink // Add the MongoDB sink to the transaction stream transactionStream.addSink(new MongoDBSink()); // Execute the Flink job env.execute("Fraud Detection App");
Mongo sink için şöyle yaparız.
public class MongoDBSink extends RichSinkFunction<String> { private static final Logger LOG = LoggerFactory.getLogger(MongoDBSink.class); private transient MongoCollection<Document> collection; @Override public void open(Configuration parameters) { // Create the MongoDB client to establish connection MongoClientSettings settings = MongoClientSettings.builder() .applyToClusterSettings(builder -> builder.hosts(Collections.singletonList(new ServerAddress("mongo", 27017)))) codecRegistry(createCodecRegistry()) .build(); MongoClient mongoClient = MongoClients.create(settings); // Access the MongoDB database and collection // At this stage, if the MongoDB database and collection do not exist, // they would automatically be created MongoDatabase database = mongoClient.getDatabase("finance"); collection = database.getCollection("financial_transactions"); } @Override public void invoke(String value, Context context) { // Consume the event from the Redpanda topic LOG.info("Consumed event : " + value); Document transactionDocument = Document.parse(value); LOG.info("transactionDocument is : "+ transactionDocument); // Optionally you can add fraud detection logic as an additional exercise task // from your end here // ... // Insert into MongoDB collection collection.insertOne(transactionDocument); } @Override public void close() { // Clean up resources, if needed } private CodecRegistry createCodecRegistry() { // The method createCodecRegistry is a helper method that is used to create a // CodecRegistry object for MongoDB. // In MongoDB, a CodecRegistry is responsible for encoding and decoding Java // objects to // BSON (Binary JSON) format, which is the native format used by MongoDB to store // and retrieve data. return CodecRegistries.fromRegistries( MongoClientSettings.getDefaultCodecRegistry(), CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true) .build()) ); } }
Örnek
Şöyle yaparız
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class KafkaFlinkIntegration { public static void main(String[] args) throws Exception { // Setting up the streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); // Kafka consumer configuration Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); // Creating a data stream from Kafka DataStream<String> stream = env .addSource(new FlinkKafkaConsumer<>("test_topic", new SimpleStringSchema(), properties)); // Processing the stream stream.flatMap(new Splitter()) .print(); env.execute("Kafka Flink Integration Example"); } }
FlatMapFunction için şöyle yaparız
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; // Simple FlatMap function to split the data public static class Splitter implements FlatMapFunction<String, String> { @Override public void flatMap(String value, Collector<String> out) throws Exception { for (String word: value.split(" ")) { out.collect(word); } } }
Örnek -Avro için FlinkKafkaConsumer
Projeye Kafka ve Avro bağımlılıklarını eklemek gerekir. Şöyle yaparız
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-streams-avro-serde</artifactId> <version>${confluent.version}</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-streams-schema-registry-client</artifactId> <version>${confluent.version}</version> </dependency>
Şöyle yaparız
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "<bootstrap.servers>"); properties.setProperty("security.protocol", "SASL_SSL"); properties.setProperty("sasl.mechanism", "PLAIN"); properties.setProperty("sasl.jaas.config", "<sasl.jaas.config>"); properties.setProperty("group.id", "<group id>"); properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("schema.registry.url", "<schema.registry.url>"); properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); properties.setProperty(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName()); FlinkKafkaConsumer<MyRecord> kafkaConsumer = new FlinkKafkaConsumer<>( "<topic>", new AvroDeserializationSchema<>(MyRecord.class, new Schema.Parser() .parse(MyRecord.SCHEMA$)), properties ); kafkaConsumer.setStartFromEarliest(); env.addSource(kafkaConsumer) .map(record -> { // TODO: Add custom logic here return record; }) .print(); env.execute("Flink Kafka Consumer");
Hiç yorum yok:
Yorum Gönder