package KafkaProducer.producer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class Consumer {

public static void main(String[] args) {
// TODO Auto-generated method stub
Properties configs = new Properties();
configs.put("bootstrap.servers", "211.41.186.140:9092, 211.41.186.140:9093, 211.41.186.140:9094");     // kafka server host 및 port
        // bootstrap.servers 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트
configs.put("session.timeout.ms", "10000");             // session 설정 *(5초로 설정)*
        // 컨슈머와 브로커 사이의 세션타임 아웃 시간, 브로커가 살아있는것으로 판단하는 시간
configs.put("group.id", "tims-kafka");                // topic 설정
        // 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    // key deserializer
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // value deserializer
        //"key.deserializer", "value.deserializer" - 바이트로 표현된 Key, Value 값을 다시 객체로 만들어 주는 클래스
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);    // consumer 생성
        consumer.subscribe(Arrays.asList("spark"));      
        // 이 설정들을 세팅한 Properties 객체를 인자로 넘겨서 KafkaConsumer 객체를 생성, topic 설정
        
        while (true) {  // 계속 loop를 돌면서 브로커에서 메시지를 읽어 들임
            ConsumerRecords<String, String> records = consumer.poll(500); 
            // 일정량의 ConsumerRecord 들이 담긴 ConsumerRecords 객체가 리턴
            for (ConsumerRecord<String, String> record : records) {
                String s = record.topic();
                if (s.equals("spark")) {
                    System.out.println(record.value());
                } else {
                    throw new IllegalStateException("get message on topic " + record.topic());
                }
            }
        }   
}
}

 

package KafkaProducer.producer;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties configs = new Properties();
        configs.put("bootstrap.servers", "211.41.186.140:9092, 211.41.186.140:9093, 211.41.186.140:9094");
        // 카프카 프로듀서가 최초로 접속할 때 필요한 주소들
        configs.put("acks", "all");
        // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않음, 속도가 가장 느리며 메시지 손실 가능성 없음
        
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   // serialize 설정
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
        // 카프카 메시지의 키와 데이터 값를 바이트 배열로 만들어 줄 클래스를 명시한다.
        // org.apache.kafka.common.serialization 인터페이스를 구현한 클래스를 사용할 수 있다
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
        // message를 생성
        for (int i = 0; i < 5; i++) {
            String v = "hello"+i;
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("spark", v);
            // 카프카 서버로 보낼 메시지 생성, ProducerRecord 생성자의 첫 번째 인자는 Topic 이름, 두 번째는 전송할 데이터
            producer.send(record);
        }       
        // 종료
        producer.flush();
        producer.close();
}
}

 

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <groupId>KafkaProducer</groupId>
  <artifactId>producer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>producer</name>
  <url>http://maven.apache.org</url>
  <properties>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.1.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>1.1.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.8.2.1</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>
  <repositories>
    <repository>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
      <id>central</id>
      <name>Central Repository</name>
      <url>https://repo.maven.apache.org/maven2</url>
    </repository>
  </repositories>
  <pluginRepositories>
    <pluginRepository>
      <releases>
        <updatePolicy>never</updatePolicy>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
      <id>central</id>
      <name>Central Repository</name>
      <url>https://repo.maven.apache.org/maven2</url>
    </pluginRepository>
  </pluginRepositories>
  <build>
    <sourceDirectory>C:\Users\이희원\eclipse-workspace\producer\src\main\java</sourceDirectory>
    <scriptSourceDirectory>C:\Users\이희원\eclipse-workspace\producer\src\main\scripts</scriptSourceDirectory>
    <testSourceDirectory>C:\Users\이희원\eclipse-workspace\producer\src\test\java</testSourceDirectory>
    <outputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\classes</outputDirectory>
    <testOutputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\test-classes</testOutputDirectory>
    <resources>
      <resource>
        <directory>C:\Users\이희원\eclipse-workspace\producer\src\main\resources</directory>
      </resource>
    </resources>
    <testResources>
      <testResource>
        <directory>C:\Users\이희원\eclipse-workspace\producer\src\test\resources</directory>
      </testResource>
    </testResources>
    <directory>C:\Users\이희원\eclipse-workspace\producer\target</directory>
    <finalName>producer-0.0.1-SNAPSHOT</finalName>
    <pluginManagement>
      <plugins>
        <plugin>
          <artifactId>maven-antrun-plugin</artifactId>
          <version>1.3</version>
        </plugin>
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>2.2-beta-5</version>
        </plugin>
        <plugin>
          <artifactId>maven-dependency-plugin</artifactId>
          <version>2.8</version>
        </plugin>
        <plugin>
          <artifactId>maven-release-plugin</artifactId>
          <version>2.5.3</version>
        </plugin>
      </plugins>
    </pluginManagement>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <executions>
          <execution>
            <id>default-compile</id>
            <phase>compile</phase>
            <goals>
              <goal>compile</goal>
            </goals>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </execution>
          <execution>
            <id>default-testCompile</id>
            <phase>test-compile</phase>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </execution>
        </executions>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
        <artifactId>maven-clean-plugin</artifactId>
        <version>2.5</version>
        <executions>
          <execution>
            <id>default-clean</id>
            <phase>clean</phase>
            <goals>
              <goal>clean</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-resources-plugin</artifactId>
        <version>2.6</version>
        <executions>
          <execution>
            <id>default-testResources</id>
            <phase>process-test-resources</phase>
            <goals>
              <goal>testResources</goal>
            </goals>
          </execution>
          <execution>
            <id>default-resources</id>
            <phase>process-resources</phase>
            <goals>
              <goal>resources</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-jar-plugin</artifactId>
        <version>2.4</version>
        <executions>
          <execution>
            <id>default-jar</id>
            <phase>package</phase>
            <goals>
              <goal>jar</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.12.4</version>
        <executions>
          <execution>
            <id>default-test</id>
            <phase>test</phase>
            <goals>
              <goal>test</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-install-plugin</artifactId>
        <version>2.4</version>
        <executions>
          <execution>
            <id>default-install</id>
            <phase>install</phase>
            <goals>
              <goal>install</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-deploy-plugin</artifactId>
        <version>2.7</version>
        <executions>
          <execution>
            <id>default-deploy</id>
            <phase>deploy</phase>
            <goals>
              <goal>deploy</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-site-plugin</artifactId>
        <version>3.3</version>
        <executions>
          <execution>
            <id>default-site</id>
            <phase>site</phase>
            <goals>
              <goal>site</goal>
            </goals>
            <configuration>
              <outputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\site</outputDirectory>
              <reportPlugins>
                <reportPlugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-project-info-reports-plugin</artifactId>
                </reportPlugin>
              </reportPlugins>
            </configuration>
          </execution>
          <execution>
            <id>default-deploy</id>
            <phase>site-deploy</phase>
            <goals>
              <goal>deploy</goal>
            </goals>
            <configuration>
              <outputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\site</outputDirectory>
              <reportPlugins>
                <reportPlugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-project-info-reports-plugin</artifactId>
                </reportPlugin>
              </reportPlugins>
            </configuration>
          </execution>
        </executions>
        <configuration>
          <outputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\site</outputDirectory>
          <reportPlugins>
            <reportPlugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-project-info-reports-plugin</artifactId>
            </reportPlugin>
          </reportPlugins>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <outputDirectory>C:\Users\이희원\eclipse-workspace\producer\target\site</outputDirectory>
  </reporting>
</project>
반응형

+ Recent posts