1. 소개

이 기사에서는 향후 개발을위한 시작점 역할을 할 수있는 부트 스트랩 및 바로 사용할 수있는 애플리케이션을 제공하는 Spring Cloud 앱 스타터를 사용하는 방법을 보여줍니다.

간단히 말해, Task App Starters는 데이터베이스 마이그레이션 및 분산 테스트와 같은 사용 사례 전용이며 Stream App Starters는 외부 시스템과의 통합을 제공합니다.

전반적으로 55 개가 넘는 선발 선수가 있습니다. 이 두 가지에 대한 자세한 내용은 여기여기 에서 공식 문서를 확인 하십시오 .

다음으로 Twitter 게시물을 Hadoop 분산 파일 시스템으로 스트리밍하는 소규모 분산 Twitter 애플리케이션을 구축합니다.

2. 설정하기

우리는 사용할 것이다 소비자 키액세스 토큰을 간단한 트위터 응용 프로그램을 만들 수 있습니다.

그런 다음 향후 빅 데이터 목적을 위해 Twitter 스트림을 유지할 수 있도록 Hadoop을 설정합니다.

마지막으로, 우리는 컴파일하고 조립의 독립형 구성 요소에 공급 된 Spring GitHub의 저장소 중 하나를 사용할 수있는 옵션이 소스 - 프로세서 - 싱크 메이븐을 사용하여 아키텍처 패턴 또는 결합 소스 , 프로세서싱크를 자신의 Spring 스트림 바인딩 인터페이스를 통해입니다.

이 작업을 수행하는 두 가지 방법을 모두 살펴 보겠습니다.

이전에는 모든 Stream App Starter가 github.com/spring-cloud/spring-cloud-stream-app-starters 에서 하나의 대규모 저장소로 수집되었다는 점에 주목할 가치가 있습니다. 각 스타터는 단순화되고 분리되었습니다.

3. Twitter 자격 증명

먼저 Twitter 개발자 자격 증명을 설정하겠습니다. Twitter 개발자 자격 증명을 얻으려면 단계에 따라 앱을 설정 하고 공식 Twitter 개발자 문서에서 액세스 토큰 만듭니다 .

특히 다음이 필요합니다.

  1. 소비자 키
  2. 소비자 키 비밀
  3. 액세스 토큰 비밀
  4. 액세스 토큰

아래에서 사용할 것이므로 해당 창을 열어 두거나 적어 두십시오!

4. Hadoop 설치

다음으로 Hadoop을 설치하겠습니다! 공식 문서 를 따르 거나 단순히 Docker를 활용할 수 있습니다 .

$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1

5. 앱 스타터 컴파일

독립된 완전 개별 구성 요소를 사용하기 위해 GitHub 저장소에서 원하는 Spring Cloud Stream App Starter를 개별적으로 다운로드하고 컴파일 할 수 있습니다.

5.1. Twitter Spring Cloud Stream 앱 스타터

Twitter Spring Cloud Stream App Starter ( org.springframework.cloud.stream.app.twitterstream.source )를 프로젝트에 추가해 보겠습니다 .

git clone https://github.com/spring-cloud-stream-app-starters/twitter.git

그런 다음 Maven을 실행합니다.

./mvnw clean install -PgenerateApps

결과적으로 컴파일 된 시작 앱은 로컬 프로젝트 루트의 '/ target'에서 사용할 수 있습니다.

그런 다음 컴파일 된 .jar를 실행하고 다음과 같이 관련 애플리케이션 속성을 전달할 수 있습니다.

java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
    --accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>

익숙한 Spring application.properties를 사용하여 자격 증명을 전달할 수도 있습니다 .

twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...

5.2. HDFS 스프링 클라우드 스트림 앱 스타터

이제 (Hadoop이 이미 설정되어있는 상태에서) HDFS Spring Cloud Stream App Starter ( org.springframework.cloud.stream.app.hdfs.sink ) 의존성을 프로젝트에 추가하겠습니다.

먼저 관련 저장소를 복제합니다.

git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git

그런 다음 Maven 작업을 실행합니다.

./mvnw clean install -PgenerateApps

결과적으로 컴파일 된 시작 앱은 로컬 프로젝트 루트의 '/ target'에서 사용할 수 있습니다. 그런 다음 컴파일 된 .jar를 실행하고 관련 애플리케이션 속성을 전달할 수 있습니다.

java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/

' hdfs : //127.0.0.1 : 50010 / '은 Hadoop의 기본값이지만 기본 HDFS 포트는 인스턴스 구성 방법에 따라 다를 수 있습니다.

이전에 전달한 구성을 고려할 때 ' http://0.0.0.0:50070 ' 에서 데이터 노드 (및 해당 현재 포트) List을 볼 수 있습니다 .

컴파일 전에 익숙한 Spring application.properties사용하여 자격 증명을 전달할 수도 있으므로 항상 CLI를 통해 전달할 필요가 없습니다.

기본 Hadoop 포트를 사용 하도록 application.properties 구성 해 보겠습니다 .

hdfs.fs-uri=hdfs://127.0.0.1:50010/

6. AggregateApplicationBuilder 사용

또는 org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder통해 Spring Stream SourceSink간단한 Spring Boot 애플리케이션으로 결합 할 수 있습니다 !

먼저 pom.xml에 두 개의 Stream App Starter를 추가합니다 .

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
</dependencies>

그런 다음 두 개의 Stream App Starter 의존성을 각각의 하위 애플리케이션으로 래핑하여 결합하기 시작합니다.

6.1. 앱 구성 요소 구축

우리 SourceApp는 지정 소스가 변형되거나 소비되는 :

@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
    @InboundChannelAdapter(Source.OUTPUT)
    public String timerMessageSource() {
        return new SimpleDateFormat().format(new Date());
    }
}

SourceApporg.springframework.cloud.stream.messaging.Source에 바인딩하고 적절한 구성 클래스를 주입하여 환경 속성에서 필요한 설정을 선택합니다.

다음으로 간단한 org.springframework.cloud.stream.messaging.Processor 바인딩을 설정합니다 .

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        log.info("Payload received!");
        return payload;
    }
}

그런 다음 소비자 ( Sink )를 만듭니다 .

@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
    @ServiceActivator(inputChannel= Sink.INPUT)
    public void loggerSink(Object payload) {
        log.info("Received: " + payload);
    }
}

여기에서 SinkApporg.springframework.cloud.stream.messaging.Sink에 바인딩 하고 지정된 Hadoop 설정을 사용하도록 올바른 구성 클래스를 다시 삽입합니다.

마지막으로, 우리는 우리의 결합 SourceApp , ProcessorApp , 우리 SinkApp 은 Using AggregateApplicationBuilder을 우리에 AggregateApp 주요 방법 :

@SpringBootApplication
public class AggregateApp {
    public static void main(String[] args) {
        new AggregateApplicationBuilder()
          .from(SourceApp.class).args("--fixedDelay=5000")
          .via(ProcessorApp.class)
          .to(SinkApp.class).args("--debug=true")
          .run(args);
    }
}

모든 Spring Boot 애플리케이션과 마찬가지로 application.properties를 통해 또는 프로그래밍 방식 으로 지정된 설정을 환경 속성으로 삽입 할 수 있습니다 .

Spring Stream 프레임 워크를 사용하고 있기 때문에 인수를 AggregateApplicationBuilder 생성자에 전달할 수도 있습니다 .

6.2. 완성 된 앱 실행

그런 다음 다음 명령 줄 지침을 사용하여 응용 프로그램을 컴파일하고 실행할 수 있습니다.

    $ mvn install
    $ java -jar twitterhdfs.jar

@SpringBootApplication 클래스를 별도의 패키지에 보관해야 합니다 (그렇지 않으면 여러 다른 바인딩 예외가 발생합니다)! 사용하는 방법에 대한 자세한 내용은 AggregateApplicationBuilder를 - 상기보고가 공식 문서를 .

앱을 컴파일하고 실행 한 후 콘솔에 다음과 같은 내용이 표시되어야합니다 (당연히 내용은 트윗에 따라 다릅니다).

2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
c.b.twitterhdfs.processor.ProcessorApp   : Payload received!
2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
com.baeldung.twitterhdfs.sink.SinkApp    : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...

그것들 소스 로부터 데이터를 수신 할 때 프로세서싱크 의 올바른 작동을 보여줍니다 ! 이 예에서는 HDFS 싱크가 많은 작업을 수행하도록 구성하지 않았습니다. "Payload received!"라는 메시지 만 출력됩니다.

7. 결론

이 예제에서 우리는 두 개의 멋진 Spring Stream App Starter를 하나의 멋진 Spring Boot 예제로 결합하는 방법을 배웠습니다!

다음은 Spring Boot Starters 에 대한 다른 훌륭한 공식 기사 맞춤형 스타터 를 만드는 방법입니다 !

항상 그렇듯이이 기사에 사용 된 코드 는 GitHub 에서 찾을 수 있습니다 .