回答:Apache FlinkとRiak CSに参加するにはどうすればいいですか?
Riak CSには、S3(バージョン2)互換インタフェースがあります。したがって、HadoopのS3ファイルシステムアダプタを使用してRiak CSを使用することは可能です。
私はなぜ知られていないが、ApacheのFLINKは脂肪ジャー内部のHadoopのファイルシステムアダプタの一部のみ(lib/flink-dist_2.11-1.0.1.jar
)すなわち、それはFTPファイルシステム(org.apache.hadoop.fs.ftp.FTPFileSystem
)を持っていますが、S3ファイルシステム(すなわちorg.apache.hadoop.fs.s3a.S3AFileSystem
)を持っていませんがあります。したがって、この問題を解決する2つの方法があります。
- これらのアダプターをHadoopインストールから使用してください。私はこれを試していないが、HADOOP_CLASSPATHまたはHADOOP_HOME evn変数を設定する必要があるようだ。
- モンキーパッチのApache FLINKので
<flink home>/lib
ディレクトリ
に必要なJARファイルをダウンロードし、私の環境で規定のHadoopにしたくないので、私は2番目の道を選択してください。あなたはのHadoop distのか、インターネットからJARをコピーすることができます。
curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar
あなたは、このようなバージョンは、Hadoopの2.7.2で使用して、私はHadoopののこのバージョンと互換性FLINKを使用するので、私は古いバージョンを使用しています見ることができるように。
FYI:このようなハックは、これらのJARの最新バージョンを独自のフローで使用している場合に問題を引き起こす可能性があります。
// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
dependencies {
include(dependency('.*:.*:.*'))
}
relocate 'org.apache.http', 'relocated.org.apache.http'
relocate 'org.apache.commons', 'relocated.org.apache.commons'
}
その後、あなたはHadoopの互換性のあるファイルシステムのでflink-conf.yaml
にcore-site.xml
へのパスを指定する必要があります別のバージョンに関連する問題を回避するために、あなたは、フローを使用して脂肪を瓶にのようなものを(私はGradleのを使用しています)を構築する際に、パッケージを再配置することができます負荷設定のため、この設定を使用して:
...
fs.hdfs.hadoopconf: /flink/conf
...
あなたは私がちょうど<fink home>/conf
ディレクトリにそれを置く見ることができるように。それは、次の設定があります。
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>my-riak-cs.stage.local</value> // this is my Riak CS host
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
<value>false</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>????</value> // this is my access key for Riak CS
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>????</value> // this is my secret key for Riak CS
</property>
</configuration>
その後、あなたは、推薦hereとしてflink-conf.yaml
にRiakにCSバケットを構成する必要があります。
...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...
とRiakにCSでバケットを作成します。私はs3cmd
を使用しています(私のOS X用のdevのENVにbrew
の上に設置さ):FYI
s3cmd mb s3://example-staging-flink
は:
signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS
:s3cmd
を使用する前に、あなたはそれが~/.s3cmd
ファイルにいくつかの設定をs3cmd --configure
を使用して、修正する設定する必要がありますつまり、Riak CSのスタンドアロンHA Apache Flinkクラスタの保存/復元状態を設定する必要があります。