2016-12-19 5 views
0

私はすでにSpring XDのためにthis questionを要求しています。私は今、Spring CDFに移行しようとしています。Spring Cloud DataFlow - TCPソースでカスタムTCPエンコーダ/デコーダを使用する方法

私はthis linkを見つけました。そこでコードを再利用しようとしましたが、私のコードを変更しました。

は、私は、次のPOMを作成しました:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>com.example</groupId> 
    <artifactId>tcp-ber-source</artifactId> 
    <version>1.0</version> 
    <packaging>jar</packaging> 

    <name>TCP Ber Source</name> 

    <parent> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-parent</artifactId> 
     <version>1.4.2.RELEASE</version> 
     <relativePath/> <!-- lookup parent from repository --> 
    </parent> 

    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
     <java.version>1.8</java.version> 

     <tcp-app-starters-common.version>1.1.0.RELEASE</tcp-app-starters-common.version> 
    </properties> 

    <dependencies> 
     <dependency> 
      <groupId>org.springframework.cloud</groupId> 
      <artifactId>spring-cloud-starter-stream-kafka</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.springframework.cloud.stream.app</groupId> 
      <artifactId>tcp-app-starters-common</artifactId> 
      <version>${tcp-app-starters-common.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>com.example</groupId> 
      <artifactId>ber-byte-array-serializers</artifactId> 
      <version>1.0</version> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-configuration-processor</artifactId> 
      <optional>true</optional> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 
    </dependencies> 

    <dependencyManagement> 
     <dependencies> 
      <dependency> 
       <groupId>org.springframework.cloud</groupId> 
       <artifactId>spring-cloud-dependencies</artifactId> 
       <version>Camden.SR3</version> 
       <type>pom</type> 
       <scope>import</scope> 
      </dependency> 
     </dependencies> 
    </dependencyManagement> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.springframework.boot</groupId> 
       <artifactId>spring-boot-maven-plugin</artifactId> 
      </plugin> 
     </plugins> 
    </build> 
</project> 

構成:

@EnableBinding(Source.class) 
@EnableConfigurationProperties(TcpBerSourceProperties.class) 
public class TcpBerSourceConfiguration { 

    private final Source channels; 
    private final TcpBerSourceProperties properties; 

    @Autowired 
    public TcpSourceConfiguration(final TcpBerSourceProperties properties, final Source channels) { 
     this.properties = properties; 
     this.channels = channels; 
    } 

    @Bean 
    public TcpReceivingChannelAdapter adapter(@Qualifier("tcpBerSourceConnectionFactory") final AbstractConnectionFactory connectionFactory) { 
     final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); 
     adapter.setConnectionFactory(connectionFactory); 
     adapter.setOutputChannel(this.channels.output()); 
     return adapter; 
    } 

    @Bean 
    public TcpConnectionFactoryFactoryBean tcpBerSourceConnectionFactory(@Qualifier("tcpBerSourceDecoder") final AbstractByteArraySerializer decoder) throws Exception { 
     final TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean(); 
     factoryBean.setType("server"); 
     factoryBean.setPort(this.properties.getPort()); 
     factoryBean.setUsingNio(this.properties.isNio()); 
     factoryBean.setUsingDirectBuffers(this.properties.isUseDirectBuffers()); 
     factoryBean.setLookupHost(this.properties.isReverseLookup()); 
     factoryBean.setDeserializer(decoder); 
     factoryBean.setSoTimeout(this.properties.getSocketTimeout()); 
     return factoryBean; 
    } 

    @Bean 
    public BerEncoderDecoderFactoryBean tcpBerSourceDecoder() { 
     final BerEncoderDecoderFactoryBean factoryBean = new BerEncoderDecoderFactoryBean(this.properties.getDecoder()); 
     factoryBean.setMaxMessageSize(this.properties.getBufferSize()); 
     return factoryBean; 
    } 
} 

そしてこのFactoryBeanの:

public class BerEncoderDecoderFactoryBean extends AbstractFactoryBean<AbstractByteArraySerializer> implements ApplicationEventPublisherAware { 

    private final BerEncoding encoding; 

    private ApplicationEventPublisher applicationEventPublisher; 
    private Integer maxMessageSize; 

    public BerEncoderDecoderFactoryBean(final BerEncoding encoding) { 
     Assert.notNull(encoding, "'encoding' cannot be null"); 
     this.encoding = encoding; 
    } 

    @Override 
    public void setApplicationEventPublisher(final ApplicationEventPublisher applicationEventPublisher) { 
     this.applicationEventPublisher = applicationEventPublisher; 
    } 

    /** 
    * The maximum message size allowed when decoding. 
    * @param maxMessageSize the maximum message size. 
    */ 
    public void setMaxMessageSize(final int maxMessageSize) { 
     this.maxMessageSize = maxMessageSize; 
    } 

    @Override 
    protected AbstractByteArraySerializer createInstance() throws Exception { 
     final AbstractByteArraySerializer codec; 
     switch (this.encoding) { 
      case SPLIT: 
       codec = new ByteArrayBerSplitSerializer(); 
       break; 
      case EXTRACT: 
       codec = new ByteArrayBerExtractSerializer(); 
       break; 
      default: 
       throw new IllegalArgumentException("Invalid encoding: " + this.encoding); 
     } 
     codec.setApplicationEventPublisher(this.applicationEventPublisher); 
     if (this.maxMessageSize != null) { 
      codec.setMaxMessageSize(this.maxMessageSize); 
     } 
     return codec; 
    } 

    @Override 
    public Class<?> getObjectType() { 
     return AbstractByteArraySerializer.class; 
    } 
} 

BerEncodingは、単純な列挙型で、TcpBerSourcePropertiesは非常に簡単です。

これは正しいアプローチですか?

これはどうすれば実行できますか?私は@SpringBootApplicationどこでも私は春のブートスタンドアロンアプリケーションとして実行するために言及したリンク上で見つけたTCPストリームアプリの先駆けを見ることができません?

答えて

1

独自のSpring起動Appクラスを作成し、設定クラスをインポートする必要があります。 the documentation about creating custom appsを参照してください。

スターターからスタンダードアプリ(ウサギとカフカバインダー用)をexplained hereとして生成します。

+0

うわー、すごいです、私はそれらの文書を見逃しました...ありがとう! – aturkovic

関連する問題