package KafkaProducer.producer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties configs = new Properties();
configs.put("bootstrap.servers", "211.41.186.140:9092, 211.41.186.140:9093, 211.41.186.140:9094"); // kafka server host 및 port
// bootstrap.servers 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트
configs.put("session.timeout.ms", "10000"); // session 설정 *(5초로 설정)*
// 컨슈머와 브로커 사이의 세션타임 아웃 시간, 브로커가 살아있는것으로 판단하는 시간
configs.put("group.id", "tims-kafka"); // topic 설정
// 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key deserializer
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value deserializer
//"key.deserializer", "value.deserializer" - 바이트로 표현된 Key, Value 값을 다시 객체로 만들어 주는 클래스
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); // consumer 생성
consumer.subscribe(Arrays.asList("spark"));
// 이 설정들을 세팅한 Properties 객체를 인자로 넘겨서 KafkaConsumer 객체를 생성, topic 설정
while (true) { // 계속 loop를 돌면서 브로커에서 메시지를 읽어 들임
ConsumerRecords<String, String> records = consumer.poll(500);
// 일정량의 ConsumerRecord 들이 담긴 ConsumerRecords 객체가 리턴
for (ConsumerRecord<String, String> record : records) {
String s = record.topic();
if (s.equals("spark")) {
System.out.println(record.value());
} else {
throw new IllegalStateException("get message on topic " + record.topic());
}
}
}
}
}
package KafkaProducer.producer;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Producer {
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties configs = new Properties();
configs.put("bootstrap.servers", "211.41.186.140:9092, 211.41.186.140:9093, 211.41.186.140:9094");
// 카프카 프로듀서가 최초로 접속할 때 필요한 주소들
configs.put("acks", "all");
// 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않음, 속도가 가장 느리며 메시지 손실 가능성 없음
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
// 카프카 메시지의 키와 데이터 값를 바이트 배열로 만들어 줄 클래스를 명시한다.
// org.apache.kafka.common.serialization 인터페이스를 구현한 클래스를 사용할 수 있다
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
// message를 생성
for (int i = 0; i < 5; i++) {
String v = "hello"+i;
ProducerRecord<String, String> record = new ProducerRecord<String, String>("spark", v);
// 카프카 서버로 보낼 메시지 생성, ProducerRecord 생성자의 첫 번째 인자는 Topic 이름, 두 번째는 전송할 데이터
producer.send(record);
}
// 종료
producer.flush();
producer.close();
}
}
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>KafkaProducer</groupId>
<artifactId>producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>producer</name>
<url>http://maven.apache.org</url>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>central</id>
<name>Central Repository</name>
<url>https://repo.maven.apache.org/maven2</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<releases>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>central</id>
<name>Central Repository</name>
<url>https://repo.maven.apache.org/maven2</url>
</pluginRepository>
</pluginRepositories>
<build>
<sourceDirectory>C:\Users\이희원\eclipse-workspace\producer\src\main\java</sourceDirectory>
<scriptSourceDirectory>C:\Users\이희원\eclipse-workspace\producer\src\main\scripts</scriptSourceDirectory>
<testSourceDirectory>C:\Users\이희원\eclipse-workspace\producer\src\test\java</testSourceDirectory>
<outputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\classes</outputDirectory>
<testOutputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\test-classes</testOutputDirectory>
<resources>
<resource>
<directory>C:\Users\이희원\eclipse-workspace\producer\src\main\resources</directory>
</resource>
</resources>
<testResources>
<testResource>
<directory>C:\Users\이희원\eclipse-workspace\producer\src\test\resources</directory>
</testResource>
</testResources>
<directory>C:\Users\이희원\eclipse-workspace\producer\target</directory>
<finalName>producer-0.0.1-SNAPSHOT</finalName>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.3</version>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-5</version>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
</plugin>
<plugin>
<artifactId>maven-release-plugin</artifactId>
<version>2.5.3</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<executions>
<execution>
<id>default-compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</execution>
<execution>
<id>default-testCompile</id>
<phase>test-compile</phase>
<goals>
<goal>testCompile</goal>
</goals>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</execution>
</executions>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>2.5</version>
<executions>
<execution>
<id>default-clean</id>
<phase>clean</phase>
<goals>
<goal>clean</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.6</version>
<executions>
<execution>
<id>default-testResources</id>
<phase>process-test-resources</phase>
<goals>
<goal>testResources</goal>
</goals>
</execution>
<execution>
<id>default-resources</id>
<phase>process-resources</phase>
<goals>
<goal>resources</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>default-jar</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<executions>
<execution>
<id>default-test</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>default-install</id>
<phase>install</phase>
<goals>
<goal>install</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<executions>
<execution>
<id>default-deploy</id>
<phase>deploy</phase>
<goals>
<goal>deploy</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.3</version>
<executions>
<execution>
<id>default-site</id>
<phase>site</phase>
<goals>
<goal>site</goal>
</goals>
<configuration>
<outputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\site</outputDirectory>
<reportPlugins>
<reportPlugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
</reportPlugin>
</reportPlugins>
</configuration>
</execution>
<execution>
<id>default-deploy</id>
<phase>site-deploy</phase>
<goals>
<goal>deploy</goal>
</goals>
<configuration>
<outputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\site</outputDirectory>
<reportPlugins>
<reportPlugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
</reportPlugin>
</reportPlugins>
</configuration>
</execution>
</executions>
<configuration>
<outputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\site</outputDirectory>
<reportPlugins>
<reportPlugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
</reportPlugin>
</reportPlugins>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<outputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\site</outputDirectory>
</reporting>
</project>
반응형
'프로그래밍 > RabbitMQ & Kafka' 카테고리의 다른 글
RabbitMQ 쓰는이유 (0) | 2022.01.12 |
---|---|
[Kafka] Maven project 로 Producer와 Consumer 작성해보기 (0) | 2021.11.30 |
[RabbitMQ] Ubuntu에서 RabbitMQ 튜토리얼 따라하기 (Pub/Sub JAVA) (0) | 2021.11.22 |
[RabbitMQ] 리눅스기반 우분투에서 RabbitMQ 사용하기 (0) | 2021.11.16 |