Şöyle yaparız. Kafka topic'ten city, temperature değerlerini okur ve 1 dakikalık ortalamayı veri tabanına yazar
Örnekimport org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;import org.apache.flink.connector.jdbc.JdbcExecutionOptions;import org.apache.flink.connector.jdbc.JdbcSink;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.kafka.common.TopicPartition;import org.apache.flink.api.common.functions.AggregateFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.api.common.functions.MapFunction;public class Main {static final String BROKERS = "kafka:9092";public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironmentalo.getExecutionEnvironment();System.out.println("Environment created");KafkaSource<Weather> source = KafkaSource.<Weather>builder().setBootstrapServers(BROKERS).setProperty("partition.discovery.interval.ms", "1000").setTopics("weather").setGroupId("groupdId-919292").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new WeatherDeserializationSchema()).build();DataStreamSource<Weather> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka");DataStream<Tuple2<MyAverage, Double>> averageTemperatureStream = kafka .keyBy(myEvent -> myEvent.city).window(TumblingProcessingTimeWindows.of(Time.seconds(60))).aggregate(new AverageAggregator());DataStream<Tuple2<String, Double>> cityAndValueStream = averageTemperatureStream.map(new MapFunction<Tuple2<MyAverage, Double>, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(Tuple2<MyAverage, Double> input) {return new Tuple2<>(input.f0.city, input.f1);}});cityAndValueStream.addSink(JdbcSink .sink("insert into weather (city, average_temperature) values (?, ?)",(statement, event) -> {statement.setString(1, event.f0);statement.setDouble(2, event.f1);},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:postgresql://docker.for.mac.host.internal:5438/postgres").withDriverName("org.postgresql.Driver").withUsername("postgres").withPassword("postgres").build()));env.execute("Kafka-flink-postgres");}}
Şöyle yaparız
import org.apache.flink.api.common.functions.AggregateFunction;import org.apache.flink.api.java.tuple.Tuple2;// Aggregation function for average.public static class AverageAggregator implements AggregateFunction<Weather, MyAverage, Tuple2<MyAverage, Double>> {@Overridepublic MyAverage createAccumulator() {return new MyAverage();}@Overridepublic MyAverage add(Weather weather, MyAverage myAverage) {myAverage.city = weather.city;myAverage.count = myAverage.count + 1;myAverage.sum = myAverage.sum + weather.temperature;return myAverage;}@Overridepublic Tuple2<MyAverage, Double> getResult(MyAverage myAverage) {return new Tuple2<>(myAverage, myAverage.sum / myAverage.count);}@Overridepublic MyAverage merge(MyAverage myAverage, MyAverage acc1) {myAverage.sum = myAverage.sum + acc1.sum;myAverage.count = myAverage.count + acc1.count;return myAverage;}}
Şöyle yaparız
public static class MyAverage {public String city;public Integer count = 0;public Double sum = 0d;@Overridepublic String toString() {return "MyAverage{" +"city='" + city + '\'' +", count=" + count +", sum=" + sum +'}';}}}
Hiç yorum yok:
Yorum Gönder