26 Aralık 2023 Salı

Logback logback.xml FileAppender Tanımlama

Örnek
Şöyle yaparız. Burada farklı log seviyeleri farklı dosyalara gönderiliyor. ch.qos.logback.classic.filter.ThresholdFilter ve ch.qos.logback.classic.filter.LevelFilter kullanılıyor. 
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="WARN">

  <property name="logging.error.file.name" value="error-log" />
  <property name="logging.error.file.path" value="./"/>
  <property name="logging.file.name" value="info-log" />
  <property name="logging.file.path" value="./"/>

  <!-- Normal log appender -->
  <appender name="INFO_FILE" class="ch.qos.logback.core.FileAppender">
    <file>${logging.file.path}/${logging.file.name}</file>
    <encoder>
      <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
      <level>INFO, DEBUG, TRACE</level>
    </filter>
    <filter class="ch.qos.logback.classic.filter.LevelFilter">
      <level>ERROR, WARN</level>
      <onMatch>DENY</onMatch>
      <onMisMatch>NEUTRAL</onMisMatch>
    </filter>
  </appender>

  <!-- Error log appender -->
  <appender name="ERROR_FILE" class="ch.qos.logback.core.FileAppender">
    <file>${logging.error.file.path}/${logging.error.file.name}</file>
      <encoder>
        <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
      </encoder>
      <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
        <level>ERROR, WARN</level>
      </filter>
      <filter class="ch.qos.logback.classic.filter.LevelFilter">
        <level>INFO, DEBUG, WARN</level>
        <onMatch>DENY</onMatch>
        <onMisMatch>NEUTRAL</onMisMatch>
      </filter>
    </appender>

  <root level="INFO">
    <appender-ref ref="INFO_FILE"/>
    <appender-ref ref="ERROR_FILE"/>
  </root>
</configuration>
Açıklaması şöyle
- Trace < Debug < Info < Warn < Error < Fatal

- LevelFilter filters events based on exact level matching. If the event's level is equal to the configured level, the filter accepts or denies the event, depending on the configuration of the onMatch and onMismatch properties.

- The ThresholdFilter filters events below the specified threshold. 
1. Normal appender ThresholdFilter ile INFO, DEBUG, TRACE olaylarının altındakileri kabul etmez. 

2. Error appender ThresholdFilter ile ERROR, WARN olaylarının altındakileri kabul etmez.



14 Aralık 2023 Perşembe

TestContainers LocalStackContainer Sınıfı - SQS İle Kullanım

Örnek
1. Şöyle yaparız. Burada SQS_ENDPOINT_STRATEGY path veriliyor. Böylece kuyruğa erişmek için üretilen URL http://localhost:8701/queue/us-east-1/000000000000/myqueue şeklinde oluyor. Eğer böyle yapmazsak UnknownHostException alırız
2. Eğer istenirse awslocal komutu container içinde çalıştırılabilir.
@ClassRule
public static LocalStackContainer container = 
  new LocalStackContainer(DockerImageName.parse("localstack/localstack:3.0.2"))
  .withServices(LocalStackContainer.Service.SQS)
  .withEnv("SQS_ENDPOINT_STRATEGY", "path"); // Burası

@BeforeClass
public static void beforeClass() {
  container.execInContainer("awslocal",
    "sqs",
    "create-queue",
    "--queue-name",
    "myqueue");
}
SQSClient ile kullanmak için şöyle yaparız. SQSClient  aws.accessKeyId ve aws.secretKey değişkenlerini istiyor. Yoksa  "Unable to load AWS credentials from any provider in the chain ..." hatası alırız
void insertData() throws URISyntaxException {
  System.setProperty("aws.accessKeyId", container.getAccessKey());
  System.setProperty("aws.secretKey", container.getSecretKey());

  SqsClient sqs = SqsClient.builder()
    .endpointOverride(container.getEndpointOverride(LocalStackContainer.Service.SQS))
    .credentialsProvider(
      StaticCredentialsProvider.create(
        AwsBasicCredentials.create(container.getAccessKey(), container.getSecretKey())
      )
    )
    .region(Region.of(container.getRegion()))
    .build();
    ...
}

13 Aralık 2023 Çarşamba

Lettuce - Redis Stream Örnekleri

xadd metodu
İmzası şöyle. Stream'e yeni veri ekler
String xadd(K key, Map<K, V> body);
Örnek
Bu örnekte Lettuce ile Testcontainers ile başlatılan Redis Stream'e veri yazılıyor.  Örneğin bir kısmını Getting Started with Redis Streams and Java yazısından aldım

Maven için şu satırı dahil ederiz
<dependency>
<groupId>com.redis</groupId> <artifactId>testcontainers-redis</artifactId> <version>2.0.1</version> <scope>test</scope> </dependency> <dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>6.2.3.RELEASE</version> </dependency>
Şöyle yaparız. Burada JUnit4 kullanıldığı için @ClassRule anotasyonu var.
import com.redis.testcontainers.RedisContainer;

@ClassRule
public static final RedisContainer container = 
  new RedisContainer(DockerImageName.parse("redis:6.2.6"))
  .withLogConsumer(new Slf4jLogConsumer(LOGGER).withPrefix("Docker"));
Veri ile doldururuz. Şöyle yaparız. Artık aboneler veriyi okuyabilir.
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;

void insertData() {
  String redisURI = container.getRedisURI();
  try (RedisClient client = RedisClient.create(redisURI);
       StatefulRedisConnection<String, String> connection = client.connect()) {

    RedisCommands<String, String> syncCommands = connection.sync();

    for (int index = 0; index < ITEM_COUNT; index++) {
      // Redis Streams messages are string key/values in Java.
      Map<String, String> messageBody = createMessageBody(index);

      String messageId = syncCommands.xadd(
        STREAM_NAME,
        messageBody);

        LOGGER.info("Message {} : {} posted", messageId, messageBody);
    }
  }
}

Map<String, String> createMessageBody(int index) {
  Map<String, String> messageBody = new HashMap<>();
  messageBody.put("speed", String.valueOf(index));
  messageBody.put("direction", "270");
  messageBody.put("sensor_ts", String.valueOf(System.currentTimeMillis()));
  return messageBody;
}


12 Aralık 2023 Salı

JDBC Sürücüsü Yazma

Giriş
Böyle bir şey ilk defa hazelcast-jdbc projesinin hazelcast-jdbc-core modülünde şahit oldum

Bize gereken sınıflar şöyle
java.sql.Connection
java.sql.Driver
java.sql.DatabaseMetaData
java.sql.PreparedStatement
java.sql.ResultSet
java.sql.ResultSetMetaData
java.sql.Statement

java.sql.DriverManager Sınıfı
java.sql.DriverManager sınıfından kalıtmaya gerek yok. Açıklaması şöyle
The DriverManager class included with the java.sql package tracks the loaded JDBC drivers. The client application retrieves the desired database connections through the DriverManager class
getConnection metodu
java.sql.DriverManager#getConnection("jdbc:...") metodu kendisine kayıtlı tüm Driver nesnelerine teker teker belirtilen JDBC adresini kabul edip etmediğini sorar. Kabul eden yani bağlantı açan ilk Driver nesnesini döndürür.


JPA TypedQuery Arayüzü ve Sayfalama

Giriş
Sayfalama şu çağrılar ile yapılır
.setFirstResult(offset)
.setMaxResults (pageSize)
.getResultList ()

Örnek
Post nesnelerini çekmek için şöyle yaparız
public List<Post> filterPosts(Integer size, Integer offset) {
  CriteriaBuilder criteriaBuilder = entityManager.getCriteriaBuilder();
  CriteriaQuery<Post> criteriaQuery = criteriaBuilder.createQuery(Post.class);
  Root<Post> root = criteriaQuery.from(Post.class);

  // Optional: Add selection criteria/predicates
  // List<Predicate> predicates = new ArrayList<>();
  // predicates.add(criteriaBuilder.equal(root.get("status"), "published"));
  // CriteriaQuery<Post> query = criteriaQuery.where(predicates);

  List<Post> postList = entityManager
    .createQuery(criteriaQuery)
    .setFirstResult(offset)
    .setMaxResults(size)
    .getResultList();
  return postList;
}
Şöyle yaparız
int totalItemsCount(Predicate finalPredicate) {
  try {
    CriteriaBuilder criteriaBuilder = entityManager.getCriteriaBuilder();
    CriteriaQuery<Long> criteriaQuery = criteriaBuilder.createQuery(Long.class);
    
    Root<Post> root = criteriaQuery.from(Post.class);
    // Optional: If joins are involved, you need to specify
    // Join<Post, Comments> joinComments = root.join("comments");

    return Math.toIntExact(
      entityManager.createQuery(
        criteriaQuery.select(criteriaBuilder.count(root)).where(finalPredicate))
      .getSingleResult());
  } catch (Exception e) {
    log.error("Error fetching total count: {}", e.getMessage());
  }
  return 0;
}


8 Aralık 2023 Cuma

new BigDecimal() ve BigDecimal.valueOf() Farkı

Giriş
1. new BigDecimal(double) ile girdiğimiz değeri geri alamayabiliriz
2. BigDecimal.valueOf() ile girdiğimiz değeri geri alırız. 

Açıklaması şöyle
It’s all about how doubles are stored in binary. For instance, if you do new BigDecimal(0.1), you won’t just get 0.1. You’ll end up with a really precise value like 0.1000000000000000055511151231257827021181583404541015625. It’s because 0.1 can’t be perfectly represented in binary.
...
That’s why BigDecimal.valueOf is often preferred. It’s more about what you see and expect. But if you need extreme precision, like in scientific calculations, then new BigDecimal is your go-to.
1. new BigDecimal(doble) metodu
Açıklaması şöyle. BigDecimal double ile yaratılırsa verdiğimiz sayıyı geri alamayabiliriz
When a double must be used as a source for a BigDecimal, note that this constructor provides an exact conversion; it does not give the same result as converting the double to a String using the Double.toString(double) method and then using the BigDecimal(String) constructor. To get that result, use the static valueOf(double) method.
Örnek
Şu kod farklı çıktı veriyor. Çünkü BigDecimal.valueOf(double) çağrısı double değeri shortest decimal representation kullanark String'e çeviriyor.
@Test
public void testUsingBigDecimal(){
  BigDecimal a = new BigDecimal(0.01);
  BigDecimal b = BigDecimal.valueOf(0.01);
  System.out.println("a = " + a);
  System.out.println("b = " + b);
}
//output
//a = 0.01000000000000000020816681711721685132943093776702880859375
//b = 0.01
Örnek
System.out.println(new BigDecimal(58.34));
Bu kod çıktı olarak 58.340000000000003410605131648480892181396484375 sayısını verir.
Çünkü 58.34 sayısı double ile tam temsil edilemez.

Örnek
BigDecimal bd = new BigDecimal(0.7d);
Çıktı olarak şunu alırız.
0.6999999999999999555910790149937383830547332763671875
2. BigDecimal.valueOf() metodu
Açıklaması şöyle
Note: This is generally the preferred way to convert a double (or float) into a BigDecimal, as the value returned is equal to that resulting from constructing a BigDecimal from the result of using Double.toString(double).
valueOf() alt tarafta şuna benzer bir şekilde çalışıyor. Verilen double sayıyı String'e çevriliyor. String'e çevrimde "shortest decimal representation" kullanıyor. Böylece 0.015 aslında double ile tam temsil edilemese bile String'e çevrimde 0.015 çıktığı için doğru BigDecimal değeri elde ediyoruz.
new BigDecimal (Double.toString(0.015))


6 Aralık 2023 Çarşamba

JPA @OneToOne Bidirectional İlişki

Giriş
OneToOne ilişki tek veya çift yönlü olabilir. Açıklaması şöyle.
For this relationship type, the default data loading method is EAGER: every time you ask for A, the B will also be retrieved.
Bu ilişki EAGER. Açıklaması şöyle
OneToMany: LAZY
ManyToOne: EAGER
ManyToMany: LAZY
OneToOne: EAGER
Bidirectional İlişki iki şekilde olabilir.
1. Primary key ve foreign key farklı sütunlar
2. Primary key ve foreign key aynı

İlişkinim çift taraflı olması için Parent tarafa mutlaka zaten mappedBy yazmak gerekiyor. Şeklen şöyle


1. Primary key ve foreign key farklı sütunlar
Child üzerinde @JoinColumn kullanır. Şeklen şöyle. Burada her iki sütun için de indeks gerekiyor


Örnek
Şöyle yaparız. mappedBy ile diğer sınıftaki alanı veriyoruz.
@Entity
@Table(name = "PERSON")
public class Person {
  @OneToOne(mappedBy = "person", cascade = CascadeType.ALL)
  private DrivingLicense drivingLicense;
}

@Entity
@Table(name = "DRIVING_LICENSE")
public class DrivingLicense {
  @OneToOne
  @JoinColumn(name = "PERSON_ID", unique = true)
  private Person person;
}
2. Primary key ve foreign key aynı
Child tarafta @PrimaryKeyJoinColumn kullanılır. Şeklen şöyle.


Örnek
Burada @PrimaryKeyJoinColumn ile her iki sınıfın da aynı Id değerine sahip olması sağlanıyor. Parent tarafında şöyle yaparız
@Entity
public class Employee {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Integer id;
  ...
  @OneToOne(mappedBy = "employee", cascade = CascadeType.ALL)
  private ParkingSpace parkingSpace;
}
Child tarafında şöyle yaparız
@Entity
public class ParkingSpace {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Integer id;
   ...
  @OneToOne
  @PrimaryKeyJoinColumn
  private Employee employee;
}
orphanRemoval = true
Yukarıdaki örnekte Person nesnesinin DrivingLicense alanı null yapılırsa DrivingLicense nesnesi de silinir.

cascade Alanı
1. CascadeType.ALL Değeri
CascadeType.ALL eğer Person nesnesi silinirse ona bağlı olan DrivingLicense nesnesinin de otomatik olarak silinmesini sağlar. Ancak orphanRemoval'dan biraz farklıdır. orphanRemoval ile drivingLicense alanını null yapsak bile ehliyet nesnesinin silinmesi sağlanır. 

CascadeType.ALL ile drivingLicense alanını null yaparsak aradaki bağı biz koparttığımız için DrivingLicense nesnesi veri tabanında kalır yani silinmez.


Not : Eğer session.delete(entity); yaparsak DrivingLicense nesnesini silmeye çalışırsak  "org.hibernate.ObjectDeletedException: deleted object would be re-saved by cascade (remove deleted object from associations)" hatasını alırız.

Bu hatanın sebebi OneToOne olan bir ilişkinin tek tarafını kırmaya çalışmamızdan kaynaklanıyor.

Çözüm olarak OneToMany ilişki kullanılması yeterli.

fetchType Alanı
Lazy ise Hibernate 2 sorgu çalıştırır. Şeklen şöyle


optional Alanı
Açıklaması şöyle. Bu alanı false yapabiliyorsak yapmak lazım.
When a @OneToOne or a @ManyToOne relationship is mandatory - that is, the entity must have its associated relationship - it is important to tell JPA that this relationship is not optional.
...
Adding optional = false information will allow JPA to be more efficient in creating its select queries because it will know that it necessarily has an address associated with a person. Therefore, it is good practice to always specify this attribute when defining mandatory relationships.

5 Aralık 2023 Salı

Apache Flink - KafkaSource Sınıfı

Örnek
Şöyle yaparız. Kafka topic'ten city, temperature değerlerini okur ve 1 dakikalık ortalamayı veri tabanına yazar
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.TopicPartition;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.common.functions.MapFunction;

public class Main {

  static final String BROKERS = "kafka:9092";

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironmentalo.getExecutionEnvironment();
    System.out.println("Environment created");
    KafkaSource<Weather> source = KafkaSource.<Weather>builder()
      .setBootstrapServers(BROKERS)
      .setProperty("partition.discovery.interval.ms", "1000")
      .setTopics("weather")
      .setGroupId("groupdId-919292")
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setValueOnlyDeserializer(new WeatherDeserializationSchema())
      .build();

    DataStreamSource<Weather> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka");

    DataStream<Tuple2<MyAverage, Double>> averageTemperatureStream = kafka .keyBy(myEvent -> myEvent.city)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .aggregate(new AverageAggregator());

    DataStream<Tuple2<String, Double>> cityAndValueStream = averageTemperatureStream
      .map(new MapFunction<Tuple2<MyAverage, Double>, Tuple2<String, Double>>() {
        @Override
        public Tuple2<String, Double> map(Tuple2<MyAverage, Double> input) {
          return new Tuple2<>(input.f0.city, input.f1);
        }
      }); 

  
    cityAndValueStream.addSink(JdbcSink .sink("insert into weather (city, average_temperature) values (?, ?)",
          (statement, event) -> {
            statement.setString(1, event.f0);
            statement.setDouble(2, event.f1);
          },
          JdbcExecutionOptions.builder()
            .withBatchSize(1000)
            .withBatchIntervalMs(200)
            .withMaxRetries(5)
            .build(),
          new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .withUrl("jdbc:postgresql://docker.for.mac.host.internal:5438/postgres")
            .withDriverName("org.postgresql.Driver")
            .withUsername("postgres")
            .withPassword("postgres")
            .build()
      ));
      env.execute("Kafka-flink-postgres");
    }
}
Örnek
Şöyle yaparız
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;

// Aggregation function for average.
public static class AverageAggregator implements AggregateFunction<Weather, MyAverage, Tuple2<MyAverage, Double>> {

  @Override
  public MyAverage createAccumulator() {
    return new MyAverage();
  }

  @Override
  public MyAverage add(Weather weather, MyAverage myAverage) {
    myAverage.city = weather.city;
    myAverage.count = myAverage.count + 1;
    myAverage.sum = myAverage.sum + weather.temperature;
    return myAverage;
   }

   @Override
   public Tuple2<MyAverage, Double> getResult(MyAverage myAverage) {
     return new Tuple2<>(myAverage, myAverage.sum / myAverage.count);
   }

   @Override
   public MyAverage merge(MyAverage myAverage, MyAverage acc1) {
     myAverage.sum = myAverage.sum + acc1.sum;
     myAverage.count = myAverage.count + acc1.count;
     return myAverage;
   }
}
Şöyle yaparız
public static class MyAverage {

  public String city;
  public Integer count = 0;
  public Double sum = 0d;

  @Override
  public String toString() {
    return "MyAverage{" +
      "city='" + city + '\'' +
      ", count=" + count +
      ", sum=" + sum +
      '}';
    }
  }
}

Apache Flink - FlinkKafkaConsumer Sınıfı

Örnek - FlinkKafkaConsumer + FileSink
Şöyle yaparız. Burada Kafka'dan okunuyor ve bir HDFS sink'inine yazılıyor.
// Create a StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create a Kafka source
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
    "topic-name", 
    new SimpleStringSchema(), 
    kafkaProperties
);

// Add the source to the environment
DataStream<String> stream = env.addSource(kafkaConsumer);

// Process the data 
DataStream<String> processedStream = stream.map(new ProcessingFunction());

// Write the processed data to a Sink (e.g., HDFS)
StreamingFileSink<String> sink = StreamingFileSink
    .forRowFormat(new Path("hdfs://output-path"), new SimpleStringEncoder<String>("UTF-8"))
    .build();

processedStream.addSink(sink);

// Execute the Flink job
env.execute("Flink Data Lakehouse Job");
Örnek - FlinkKafkaConsumer + MongoDBSink 
Şöyle yaparız. Burada Kafka'dan okunuyor ve bir Mongo sink'inine yazılıyor.
// Sets up the execution environment, which is the main entry point
// to build Flink applications.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure the Redpanda source
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "redpanda-0:9092");
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");

// Create the Redpanda source connector
FlinkKafkaConsumer<String> kafkaConsumer = 
  new FlinkKafkaConsumer<>("financial-transactions",
    new SimpleStringSchema(), kafkaProps);

// Add the Redpanda source to the execution environment
DataStream<String> transactionStream = env.addSource(kafkaConsumer);

// Configure the MongoDB sink
// Add the MongoDB sink to the transaction stream
transactionStream.addSink(new MongoDBSink());

// Execute the Flink job
env.execute("Fraud Detection App");
Mongo sink için şöyle yaparız
public class MongoDBSink extends RichSinkFunction<String> {
  private static final Logger LOG = LoggerFactory.getLogger(MongoDBSink.class);
  private transient MongoCollection<Document> collection;

  @Override
  public void open(Configuration parameters) {
    // Create the MongoDB client to establish connection
    MongoClientSettings settings = MongoClientSettings.builder()
       .applyToClusterSettings(builder ->
         builder.hosts(Collections.singletonList(new ServerAddress("mongo", 27017))))
         codecRegistry(createCodecRegistry())
         .build();

      MongoClient mongoClient = MongoClients.create(settings);

      // Access the MongoDB database and collection
      // At this stage, if the MongoDB database and collection do not exist,
      // they would automatically be created
      MongoDatabase database = mongoClient.getDatabase("finance");
      collection = database.getCollection("financial_transactions");
  }

  @Override
  public void invoke(String value, Context context) {
    // Consume the event from the Redpanda topic
    LOG.info("Consumed event : " + value);
    Document transactionDocument = Document.parse(value);
    LOG.info("transactionDocument is : "+ transactionDocument);
    // Optionally you can add fraud detection logic as an additional exercise task
    // from your end here
    // ...

    // Insert into MongoDB collection
    collection.insertOne(transactionDocument);
  }

  @Override
  public void close() {
    // Clean up resources, if needed
  }

  private CodecRegistry createCodecRegistry() {
    // The method createCodecRegistry is a helper method that is used to create a
    // CodecRegistry object for MongoDB.
    // In MongoDB, a CodecRegistry is responsible for encoding and decoding Java
    // objects to
    // BSON (Binary JSON) format, which is the native format used by MongoDB to store
    // and retrieve data.
    return CodecRegistries.fromRegistries(
      MongoClientSettings.getDefaultCodecRegistry(),
        CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true)
        .build())
      );
  }
}
Örnek
Şöyle yaparız
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaFlinkIntegration {
  public static void main(String[] args) throws Exception {
    // Setting up the streaming execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment
      .getExecutionEnvironment();

    // Kafka consumer configuration
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test");

    // Creating a data stream from Kafka
    DataStream<String> stream = env
      .addSource(new FlinkKafkaConsumer<>("test_topic",
        new SimpleStringSchema(), properties));

     // Processing the stream
     stream.flatMap(new Splitter())
       .print();

    env.execute("Kafka Flink Integration Example");
  }
}
FlatMapFunction için şöyle yaparız
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;


// Simple FlatMap function to split the data
public static class Splitter implements FlatMapFunction<String, String> {
  @Override
  public void flatMap(String value, Collector<String> out) throws Exception {
    for (String word: value.split(" ")) {
      out.collect(word);
    }
  }
}
Örnek -Avro için FlinkKafkaConsumer 
Projeye Kafka ve Avro bağımlılıklarını eklemek gerekir. Şöyle yaparız
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-streams-avro-serde</artifactId>
  <version>${confluent.version}</version>
</dependency>
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-streams-schema-registry-client</artifactId>
  <version>${confluent.version}</version>
</dependency>
Şöyle yaparız
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<bootstrap.servers>");
properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("sasl.jaas.config", "<sasl.jaas.config>");
properties.setProperty("group.id", "<group id>");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("schema.registry.url", "<schema.registry.url>");
properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
properties.setProperty(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY,
  TopicRecordNameStrategy.class.getName());
FlinkKafkaConsumer<MyRecord> kafkaConsumer = new FlinkKafkaConsumer<>(
  "<topic>",
  new AvroDeserializationSchema<>(MyRecord.class, new Schema.Parser()
    .parse(MyRecord.SCHEMA$)),
  properties
);
    
kafkaConsumer.setStartFromEarliest();
env.addSource(kafkaConsumer)
  .map(record -> {
    // TODO: Add custom logic here
    return record;
  })
  .print();
    
env.execute("Flink Kafka Consumer");


3 Aralık 2023 Pazar

Java Cryptography Architecture (JCA)

Giriş
Açıklaması şöyle
The Java Cryptography Architecture (JCA) is a key framework that offers a range of APIs for various cryptographic operations.
JCA ile gelen sınıflar javax.crypto isim alanı altında. 
Bazıları şöyle
javax.crypto.Cipher
javax.crypto.KeyGenerator
javax.crypto.SecretKey