2017-12-27 25 views
2

最近Flink 1.3.2から1.4.0へのアップグレードを試みましたが、もうorg.apache.hadoop.fs.{FileSystem, Path}をインポートできない問題があります。問題は、2つの場所で発生している:Flink 1.3.2から1.4.0へのアップグレードhadoop FileSystemとPathの問題

ParquetWriter:

import org.apache.avro.Schema 
import org.apache.avro.generic.GenericRecord 
import org.apache.hadoop.fs.{FileSystem, Path} 
import org.apache.flink.streaming.connectors.fs.Writer 
import org.apache.parquet.avro.AvroParquetWriter 
import org.apache.parquet.hadoop.ParquetWriter 
import org.apache.parquet.hadoop.metadata.CompressionCodecName 

class AvroWriter[T <: GenericRecord]() extends Writer[T] { 

    @transient private var writer: ParquetWriter[T] = _ 
    @transient private var schema: Schema = _ 

    override def write(element: T): Unit = { 
    schema = element.getSchema 
    writer.write(element) 
    } 

    override def duplicate(): AvroWriter[T] = new AvroWriter[T]() 

    override def close(): Unit = writer.close() 

    override def getPos: Long = writer.getDataSize 

    override def flush(): Long = writer.getDataSize 

    override def open(fs: FileSystem, path: Path): Unit = { 
    writer = AvroParquetWriter.builder[T](path) 
     .withSchema(schema) 
     .withCompressionCodec(CompressionCodecName.SNAPPY) 
     .build() 
    } 

} 

CustomBucketer:

import org.apache.flink.core.fs.{FileSystem, Path} 

しかし、新しいPathはしていません:私はFLINKが今持っていることに気づいた

import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer 
import org.apache.flink.streaming.connectors.fs.Clock 
import org.apache.hadoop.fs.{FileSystem, Path} 
import java.io.ObjectInputStream 
import java.text.SimpleDateFormat 
import java.util.Date 

import org.apache.avro.generic.GenericRecord 

import scala.reflect.ClassTag 

class RecordFieldBucketer[T <: GenericRecord: ClassTag](dateField: String = null, dateFieldFormat: String = null, bucketOrder: Seq[String]) extends Bucketer[T] { 

    @transient var dateFormatter: SimpleDateFormat = _ 

    private def readObject(in: ObjectInputStream): Unit = { 
    in.defaultReadObject() 
    if (dateField != null && dateFieldFormat != null) { 
     dateFormatter = new SimpleDateFormat(dateFieldFormat) 
    } 
    } 

    override def getBucketPath(clock: Clock, basePath: Path, element: T): Path = { 
    val partitions = bucketOrder.map(field => { 
     if (field == dateField) { 
     field + "=" + dateFormatter.format(new Date(element.get(field).asInstanceOf[Long])) 
     } else { 
     field + "=" + element.get(field) 
     } 
    }).mkString("/") 
    new Path(basePath + "/" + partitions) 
    } 

} 

AvroParquetWriterまたはで動作するようです0メソッド。私はFlinkのFileSystemとHadoopの依存関係にいくつかの変更が加えられていることを知っています。

Hadoopの依存関係を使用する必要があるのですか、それとも、Parquetファイルをs3に書き込んでバケッティングする方法が異なるのですか?

build.sbt:

val flinkVersion = "1.4.0" 

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % flinkVersion % Provided, 
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, 
    "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, 
    "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion, 
    "org.apache.flink" % "flink-metrics-core" % flinkVersion, 
    "org.apache.flink" % "flink-metrics-graphite" % flinkVersion, 
    "org.apache.kafka" %% "kafka" % "0.10.0.1", 
    "org.apache.avro" % "avro" % "1.7.7", 
    "org.apache.parquet" % "parquet-hadoop" % "1.8.1", 
    "org.apache.parquet" % "parquet-avro" % "1.8.1", 
    "io.confluent" % "kafka-avro-serializer" % "3.2.2", 
    "com.fasterxml.jackson.core" % "jackson-core" % "2.9.2" 
) 

答えて

0

org.apache.hadoop.fs.{FileSystem, Path}クラスは、hadoop-commonsプロジェクトにあります。

1

"Hadoopのフリー-FLINK" を構築するには、1.4リリースの一つの主要機能でした。 あなたがしなければならないのは、あなたのクラスパスにHadoopの依存関係を含めることですかchangelogsを引用:

...これはまた、あなたがそのようにBucketingSinkやRollingSinkなどあなたがHDFSにコネクタを使用の場合、で、意味HadoopにバンドルされたFlinkディストリビューションを使用するか、アプリケーション用のjarファイルをビルドするときにHadoopの依存関係を必ずインクルードする必要があります。

+0

私が含める必要がある依存関係を追跡しようと思います。私はまた、s3に寄木張りを書くために依存関係を含める必要があるのか​​、それとも今、Flink 1.4でこれを行う他の方法があるのだろうかと思っています。 – moku

関連する問題