2017-12-28 14 views
0

Spru Streamingジョブの結果をドルイドのデータソースに書き込もうとしています。 SparkはDruidに就職して成功しました。ドルイドはインデックス作成を開始しますが、何も書きません。次のようにSpruckStreamingとTranquilityによってドルイドに書き込むことができません

私のコードとログは、次のとおりです。

import org.apache.spark._ 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext 
import org.apache.kafka.clients.consumer.ConsumerRecord 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010._ 
impor org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import scala.util.parsing.json._ 
import com.metamx.tranquility.spark.BeamRDD._ 
import org.joda.time.{DateTime, DateTimeZone} 


object MyDirectStreamDriver { 
    def main(args:Array[String]) { 

    val sc = new SparkContext() 

    val ssc = new StreamingContext(sc, Minutes(5)) 

    val kafkaParams = Map[String, Object](
       "bootstrap.servers" -> "[$hadoopURL]:6667", 
       "key.deserializer" -> classOf[StringDeserializer], 
       "value.deserializer" -> classOf[StringDeserializer], 
       "group.id" -> "use_a_separate_group_id_for_each_stream", 
       "auto.offset.reset" -> "latest", 
       "enable.auto.commit" -> (false: java.lang.Boolean) 

    ) 


    val eventStream = KafkaUtils.createDirectStream[String, String](
         ssc, 
         PreferConsistent, 
         Subscribe[String, String](Array("events_test"), kafkaParams)) 


    val t = eventStream.map(record => record.value).flatMap(_.split("(?<=\\}),(?=\\{)")). 
          map(JSON.parseRaw(_).getOrElse(new JSONObject(Map(""-> ""))).asInstanceOf[JSONObject]).     
          map(new DateTime(), x => (x.obj.getOrElse("OID", "").asInstanceOf[String], x.obj.getOrElse("STATUS", "").asInstanceOf[Double].toInt)). 
          map(x => MyEvent(x._1, x._2, x._3)) 
    t.saveAsTextFiles("/user/username/result", "txt") 
    t.foreachRDD(rdd => rdd.propagate(new MyEventBeamFactory)) 


    ssc.start 
    ssc.awaitTermination 
    } 
} 

case class MyEvent (time: DateTime,oid: String, status: Int) 
{ 

    @JsonValue 
    def toMap: Map[String, Any] = Map(
    "timestamp" -> (time.getMillis/1000), 
    "oid" -> oid, 
    "status" -> status 
) 
} 
object MyEvent { 
    implicit val MyEventTimestamper = new Timestamper[MyEvent] { 
    def timestamp(a: MyEvent) = a.time 
    } 

    val Columns = Seq("time", "oid", "status") 

    def fromMap(d: Dict): MyEvent = { 
    MyEvent(
     new DateTime(long(d("timestamp")) * 1000), 
     str(d("oid")),   
     int(d("status")) 
    ) 
    } 
} 

    import org.apache.curator.framework.CuratorFrameworkFactory 
import org.apache.curator.retry.BoundedExponentialBackoffRetry 
import io.druid.granularity._ 
import io.druid.query.aggregation.LongSumAggregatorFactory 
import com.metamx.common.Granularity 
import org.joda.time.Period 

class MyEventBeamFactory extends BeamFactory[MyEvent] 
{ 
    // Return a singleton, so the same connection is shared across all tasks in the same JVM. 
    def makeBeam: Beam[MyEvent] = MyEventBeamFactory.BeamInstance 

    object MyEventBeamFactory { 
    val BeamInstance: Beam[MyEvent] = { 
    // Tranquility uses ZooKeeper (through Curator framework) for coordination. 
    val curator = CuratorFrameworkFactory.newClient(
     "{IP_2}:2181", 
     new BoundedExponentialBackoffRetry(100, 3000, 5) 
    ) 
    curator.start() 

    val indexService = DruidEnvironment("druid/overlord") // Your overlord's druid.service, with slashes replaced by colons. 
    val discoveryPath = "/druid/discovery"  // Your overlord's druid.discovery.curator.path 
    val dataSource = "events_druid" 
    val dimensions = IndexedSeq("oid") 
    val aggregators = Seq(new LongSumAggregatorFactory("status", "status")) 

    // Expects simpleEvent.timestamp to return a Joda DateTime object. 
    DruidBeams 
     .builder((event: MyEvent) => event.time) 
     .curator(curator) 
     .discoveryPath(discoveryPath) 
     .location(DruidLocation(indexService, dataSource)) 
     .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities.MINUTE)) 
     .tuning(
     ClusteredBeamTuning(
      segmentGranularity = Granularity.HOUR, 
      windowPeriod = new Period("PT10M"), 
      partitions = 1, 
      replicants = 1 
     ) 
    ) 
     .buildBeam() 
    } 
} 
} 

これはドルイドインデックス作成タスクのログです:(index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0

2017-12-28T13:05:19,299 INFO [main] io.druid.indexing.worker.executor.ExecutorLifecycle - Running with task: { 
    "type" : "index_realtime", 
    "id" : "index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0", 
    "resource" : { 
    "availabilityGroup" : "events_druid-2017-12-28T13:00:00.000Z-0000", 
    "requiredCapacity" : 1 
    }, 
    "spec" : { 
    "dataSchema" : { 
     "dataSource" : "events_druid", 
     "parser" : { 
     "type" : "map", 
     "parseSpec" : { 
      "format" : "json", 
      "timestampSpec" : { 
      "column" : "timestamp", 
      "format" : "iso", 
      "missingValue" : null 
      }, 
      "dimensionsSpec" : { 
      "dimensions" : [ "oid" ], 
      "spatialDimensions" : [ ] 
      } 
     } 
     }, 
     "metricsSpec" : [ { 
     "type" : "longSum", 
     "name" : "status", 
     "fieldName" : "status", 
     "expression" : null 
     } ], 
     "granularitySpec" : { 
     "type" : "uniform", 
     "segmentGranularity" : "HOUR", 
     "queryGranularity" : { 
      "type" : "duration", 
      "duration" : 60000, 
      "origin" : "1970-01-01T00:00:00.000Z" 
     }, 
     "rollup" : true, 
     "intervals" : null 
     } 
    }, 
    "ioConfig" : { 
     "type" : "realtime", 
     "firehose" : { 
     "type" : "clipped", 
     "delegate" : { 
      "type" : "timed", 
      "delegate" : { 
      "type" : "receiver", 
      "serviceName" : "firehose:druid:overlord:events_druid-013-0000-0000", 
      "bufferSize" : 100000 
      }, 
      "shutoffTime" : "2017-12-28T14:15:00.000Z" 
     }, 
     "interval" : "2017-12-28T13:00:00.000Z/2017-12-28T14:00:00.000Z" 
     }, 
     "firehoseV2" : null 
    }, 
    "tuningConfig" : { 
     "type" : "realtime", 
     "maxRowsInMemory" : 75000, 
     "intermediatePersistPeriod" : "PT10M", 
     "windowPeriod" : "PT10M", 
     "basePersistDirectory" : "/tmp/1514466313873-0", 
     "versioningPolicy" : { 
     "type" : "intervalStart" 
     }, 
     "rejectionPolicy" : { 
     "type" : "none" 
     }, 
     "maxPendingPersists" : 0, 
     "shardSpec" : { 
     "type" : "linear", 
     "partitionNum" : 0 
     }, 
     "indexSpec" : { 
     "bitmap" : { 
      "type" : "concise" 
     }, 
     "dimensionCompression" : "lz4", 
     "metricCompression" : "lz4", 
     "longEncoding" : "longs" 
     }, 
     "buildV9Directly" : true, 
     "persistThreadPriority" : 0, 
     "mergeThreadPriority" : 0, 
     "reportParseExceptions" : false, 
     "handoffConditionTimeout" : 0, 
     "alertTimeout" : 0 
    } 
    }, 
    "context" : null, 
    "groupId" : "index_realtime_events_druid", 
    "dataSource" : "events_druid" 
} 
2017-12-28T13:05:19,312 INFO [main] io.druid.indexing.worker.executor.ExecutorLifecycle - Attempting to lock file[/apps/druid/tasks/index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0/lock]. 
2017-12-28T13:05:19,313 INFO [main] io.druid.indexing.worker.executor.ExecutorLifecycle - Acquired lock file[/apps/druid/tasks/index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0/lock] in 1ms. 
2017-12-28T13:05:19,317 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Running task: index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0 
2017-12-28T13:05:19,323 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0] location changed to [TaskLocation{host='hadooptest9.{host}', port=8100}]. 
2017-12-28T13:05:19,323 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0] status changed to [RUNNING]. 
2017-12-28T13:05:19,327 INFO [main] org.eclipse.jetty.server.Server - jetty-9.3.19.v20170502 
2017-12-28T13:05:19,350 INFO [task-runner-0-priority-0] io.druid.segment.realtime.plumber.RealtimePlumber - Creating plumber using rejectionPolicy[[email protected]925d517] 
2017-12-28T13:05:19,351 INFO [task-runner-0-priority-0] io.druid.server.coordination.CuratorDataSegmentServerAnnouncer - Announcing self[DruidServerMetadata{name='hadooptest9.{host}:8100', host='hadooptest9.{host}:8100', maxSize=0, tier='_default_tier', type='realtime', priority='0'}] at [/druid/announcements/hadooptest9.{host}:8100] 
2017-12-28T13:05:19,382 INFO [task-runner-0-priority-0] io.druid.segment.realtime.plumber.RealtimePlumber - Expect to run at [2017-12-28T14:10:00.000Z] 
2017-12-28T13:05:19,392 INFO [task-runner-0-priority-0] io.druid.segment.realtime.plumber.RealtimePlumber - Starting merge and push. 
2017-12-28T13:05:19,392 INFO [task-runner-0-priority-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] segments. Attempting to hand off segments that start before [1970-01-01T00:00:00.000Z]. 
2017-12-28T13:05:19,392 INFO [task-runner-0-priority-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] sinks to persist and merge 
2017-12-28T13:05:19,451 INFO [task-runner-0-priority-0] io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory - Connecting firehose: firehose:druid:overlord:events_druid-013-0000-0000 
2017-12-28T13:05:19,453 INFO [task-runner-0-priority-0] io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory - Found chathandler of class[io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider] 
2017-12-28T13:05:19,453 INFO [task-runner-0-priority-0] io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider - Registering Eventhandler[firehose:druid:overlord:events_druid-013-0000-0000] 
2017-12-28T13:05:19,454 INFO [task-runner-0-priority-0] io.druid.curator.discovery.CuratorServiceAnnouncer - Announcing service[DruidNode{serviceName='firehose:druid:overlord:events_druid-013-0000-0000', host='hadooptest9.{host}', port=8100}] 
2017-12-28T13:05:19,502 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Registering com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider as a provider class 
2017-12-28T13:05:19,502 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Registering com.fasterxml.jackson.jaxrs.smile.JacksonSmileProvider as a provider class 
2017-12-28T13:05:19,502 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Registering io.druid.server.initialization.jetty.CustomExceptionMapper as a provider class 
2017-12-28T13:05:19,502 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Registering io.druid.server.StatusResource as a root resource class 
2017-12-28T13:05:19,505 INFO [main] com.sun.jersey.server.impl.application.WebApplicationImpl - Initiating Jersey application, version 'Jersey: 1.19.3 10/24/2016 03:43 PM' 
2017-12-28T13:05:19,515 INFO [task-runner-0-priority-0] io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider - Registering Eventhandler[events_druid-013-0000-0000] 
2017-12-28T13:05:19,515 INFO [task-runner-0-priority-0] io.druid.curator.discovery.CuratorServiceAnnouncer - Announcing service[DruidNode{serviceName='events_druid-013-0000-0000', host='hadooptest9.{host}', port=8100}] 
2017-12-28T13:05:19,529 WARN [task-runner-0-priority-0] org.apache.curator.utils.ZKPaths - The version of ZooKeeper being used doesn't support Container nodes. CreateMode.PERSISTENT will be used instead. 
2017-12-28T13:05:19,535 INFO [task-runner-0-priority-0] io.druid.server.metrics.EventReceiverFirehoseRegister - Registering EventReceiverFirehoseMetric for service [firehose:druid:overlord:events_druid-013-0000-0000] 
2017-12-28T13:05:19,536 INFO [task-runner-0-priority-0] io.druid.data.input.FirehoseFactory - Firehose created, will shut down at: 2017-12-28T14:15:00.000Z 
2017-12-28T13:05:19,574 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.server.initialization.jetty.CustomExceptionMapper to GuiceManagedComponentProvider with the scope "Singleton" 
2017-12-28T13:05:19,576 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider to GuiceManagedComponentProvider with the scope "Singleton" 
2017-12-28T13:05:19,583 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding com.fasterxml.jackson.jaxrs.smile.JacksonSmileProvider to GuiceManagedComponentProvider with the scope "Singleton" 
2017-12-28T13:05:19,845 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.server.http.security.StateResourceFilter to GuiceInstantiatedComponentProvider 
2017-12-28T13:05:19,863 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.server.http.SegmentListerResource to GuiceInstantiatedComponentProvider 
2017-12-28T13:05:19,874 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.server.QueryResource to GuiceInstantiatedComponentProvider 
2017-12-28T13:05:19,876 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.segment.realtime.firehose.ChatHandlerResource to GuiceInstantiatedComponentProvider 
2017-12-28T13:05:19,880 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.query.lookup.LookupListeningResource to GuiceInstantiatedComponentProvider 
2017-12-28T13:05:19,882 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.query.lookup.LookupIntrospectionResource to GuiceInstantiatedComponentProvider 
2017-12-28T13:05:19,883 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.server.StatusResource to GuiceManagedComponentProvider with the scope "Undefined" 
2017-12-28T13:05:19,896 WARN [main] com.sun.jersey.spi.inject.Errors - The following warnings have been detected with resource and/or provider classes: 
    WARNING: A HTTP GET method, public void io.druid.server.http.SegmentListerResource.getSegments(long,long,long,javax.servlet.http.HttpServletRequest) throws java.io.IOException, MUST return a non-void type. 
2017-12-28T13:05:19,905 INFO [main] org.eclipse.jetty.server.handler.ContextHandler - Started [email protected]{/,null,AVAILABLE} 
2017-12-28T13:05:19,914 INFO [main] org.eclipse.jetty.server.AbstractConnector - Started [email protected]{HTTP/1.1,[http/1.1]}{0.0.0.0:8100} 
2017-12-28T13:05:19,914 INFO [main] org.eclipse.jetty.server.Server - Started @6014ms 
2017-12-28T13:05:19,915 INFO [main] io.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking start method[public void io.druid.server.listener.announcer.ListenerResourceAnnouncer.start()] on object[[email protected]]. 
2017-12-28T13:05:19,919 INFO [main] io.druid.server.listener.announcer.ListenerResourceAnnouncer - Announcing start time on [/druid/listeners/lookups/__default/hadooptest9.{host}:8100] 
2017-12-28T13:05:20,517 WARN [task-runner-0-priority-0] io.druid.segment.realtime.firehose.PredicateFirehose - [0] InputRow(s) ignored as they do not satisfy the predicate 

これはindex_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0ペイロードである:

{ 
"task":"index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0","payload":{ 
"id":"index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0","resource":{ 
"availabilityGroup":"events_druid-2017-12-28T13:00:00.000Z-0000","requiredCapacity":1},"spec":{ 
"dataSchema":{ 
"dataSource":"events_druid","parser":{ 
"type":"map","parseSpec":{ 
"format":"json","timestampSpec":{ 
"column":"timestamp","format":"iso","missingValue":null},"dimensionsSpec":{ 
"dimensions":["oid"],"spatialDimensions":[]}}},"metricsSpec":[{ 
"type":"longSum","name":"status","fieldName":"status","expression":null}],"granularitySpec":{ 
"type":"uniform","segmentGranularity":"HOUR","queryGranularity":{ 
"type":"duration","duration":60000,"origin":"1970-01-01T00:00:00.000Z"},"rollup":true,"intervals":null}},"ioConfig":{ 
"type":"realtime","firehose":{ 
"type":"clipped","delegate":{ 
"type":"timed","delegate":{ 
"type":"receiver","serviceName":"firehose:druid:overlord:events_druid-013-0000-0000","bufferSize":100000},"shutoffTime":"2017-12-28T14:15:00.000Z"},"interval":"2017-12-28T13:00:00.000Z/2017-12-28T14:00:00.000Z"},"firehoseV2":null},"tuningConfig":{ 
"type":"realtime","maxRowsInMemory":75000,"intermediatePersistPeriod":"PT10M","windowPeriod":"PT10M","basePersistDirectory":"/tmp/1514466313873-0","versioningPolicy":{ 
"type":"intervalStart"},"rejectionPolicy":{ 
"type":"none"},"maxPendingPersists":0,"shardSpec":{ 
"type":"linear","partitionNum":0},"indexSpec":{ 
"bitmap":{ 
"type":"concise"},"dimensionCompression":"lz4","metricCompression":"lz4","longEncoding":"longs"},"buildV9Directly":true,"persistThreadPriority":0,"mergeThreadPriority":0,"reportParseExceptions":false,"handoffConditionTimeout":0,"alertTimeout":0}},"context":null,"groupId":"index_realtime_events_druid","dataSource":"events_druid"}} 

このスパーク・ジョブ・スタンダーの終わりです

 50:09 INFO ZooKeeper: Client environment:os.version=3.10.0-514.10.2.el7.x86_64 
    17/12/28 14:50:09 INFO ZooKeeper: Client environment:user.name=yarn 
    17/12/28 14:50:09 INFO ZooKeeper: Client environment:user.home=/home/yarn 
    17/12/28 14:50:09 INFO ZooKeeper: Client environment:user.dir=/data1/hadoop/yarn/local/usercache/hdfs/appcache/application_1512485869804_6924/container_e58_1512485869804_6924_01_000002 
    17/12/28 14:50:09 INFO ZooKeeper: Initiating client connection, connectString={IP2}:2181 sessionTimeout=60000 [email protected] 
    17/12/28 14:50:09 INFO ClientCnxn: Opening socket connection to server {IP2}/{IP2}:2181. Will not attempt to authenticate using SASL (unknown error) 
    17/12/28 14:50:09 INFO ClientCnxn: Socket connection established, initiating session, client: /{IP6}:42704, server: {IP2}/{IP2}:2181 
    17/12/28 14:50:09 INFO ClientCnxn: Session establishment complete on server {IP2}/{IP2}:2181, sessionid = 0x25fa4ea15980119, negotiated timeout = 40000 
    17/12/28 14:50:10 INFO ConnectionStateManager: State change: CONNECTED 
    17/12/28 14:50:10 INFO Version: HV000001: Hibernate Validator 5.1.3.Final 
    17/12/28 14:50:10 INFO JsonConfigurator: Loaded class[class io.druid.guice.ExtensionsConfig] from props[druid.extensions.] as [ExtensionsConfig{searchCurrentClassloader=true, directory='extensions', hadoopDependenciesDir='hadoop-dependencies', hadoopContainerDruidClasspath='null', loadList=null}] 
    17/12/28 14:50:10 INFO LoggingEmitter: Start: started [true] 
    17/12/28 14:50:11 INFO FinagleRegistry: Adding resolver for scheme[disco]. 
    17/12/28 14:50:11 INFO CachedKafkaConsumer: Initial fetch for spark-executor-use_a_separate_group_id_for_each_stream events_test 0 6658 
    17/12/28 14:50:12 INFO ClusteredBeam: Global latestCloseTime[2017-12-28T12:00:00.000Z] for identifier[druid:overlord/events_druid] has moved past timestamp[2017-12-28T12:00:00.000Z], not creating merged beam 
    17/12/28 14:50:12 INFO ClusteredBeam: Turns out we decided not to actually make beams for identifier[druid:overlord/events_druid] timestamp[2017-12-28T12:00:00.000Z]. Returning None. 
    17/12/28 14:50:12 WARN MapPartitioner: Cannot partition object of class[class MyEvent] by time and dimensions. Consider implementing a Partitioner. 
    17/12/28 14:50:12 INFO ClusteredBeam: Global latestCloseTime[2017-12-28T12:00:00.000Z] for identifier[druid:overlord/events_druid] has moved past timestamp[2017-12-28T12:00:00.000Z], not creating merged beam 
    17/12/28 14:50:12 INFO ClusteredBeam: Turns out we decided not to actually make beams for identifier[druid:overlord/events_druid] timestamp[2017-12-28T12:00:00.000Z]. Returning None. 
    17/12/28 14:50:12 INFO ClusteredBeam: Global latestCloseTime[2017-12-28T12:00:00.000Z] for identifier[druid:overlord/events_druid] has moved past timestamp[2017-12-28T12:00:00.000Z], not creating merged beam 
    17/12/28 14:50:12 INFO ClusteredBeam: Turns out we decided not to actually make beams for identifier[druid:overlord/events_druid] timestamp[2017-12-28T12:00:00.000Z]. Returning None. 
    17/12/28 14:50:12 INFO ClusteredBeam: Global latestCloseTime[2017-12-28T12:00:00.000Z] for identifier[druid:overlord/events_druid] has moved past timestamp[2017-12-28T12:00:00.000Z], not creating merged beam 
    17/12/28 14:50:12 INFO ClusteredBeam: Turns out we decided not to actually make beams for identifier[druid:overlord/events_druid] timestamp[2017-12-28T12:00:00.000Z]. Returning None. 
    17/12/28 14:50:12 INFO ClusteredBeam: Global latestCloseTime[2017-12-28T12:00:00.000Z] for identifier[druid:overlord/events_druid] has moved past timestamp[2017-12-28T12:00:00.000Z], not creating merged beam 
    17/12/28 14:50:12 INFO ClusteredBeam: Turns out we decided not to actually make beams for identifier[druid:overlord/events_druid] timestamp[2017-12-28T12:00:00.000Z]. Returning None. 
    17/12/28 14:50:16 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1541 bytes result sent to driver 

また、結果がテキストファイルに書き込まれ、データが送信され、フォーマットされていることを確認しました。ここにテキストファイルのいくつかの行があります:

MyEvent(2017-12-28T16:10:00.387+03:00,0010,1) 
MyEvent(2017-12-28T16:10:00.406+03:00,0030,1) 
MyEvent(2017-12-28T16:10:00.417+03:00,0010,1) 
MyEvent(2017-12-28T16:10:00.431+03:00,0010,1) 
MyEvent(2017-12-28T16:10:00.448+03:00,0010,1) 
MyEvent(2017-12-28T16:10:00.464+03:00,0030,1)  

ヘルプは大歓迎です。ありがとう。

答えて

0

この問題は、次のようなDruidBeamstimestampSpecを添加することによって解決された:

DruidBeams 
     .builder((event: MyEvent) => event.time) 
     .curator(curator) 
     .discoveryPath(discoveryPath) 
     .location(DruidLocation(indexService, dataSource)) 
     .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities.MINUTE)) 
     .tuning(
     ClusteredBeamTuning(
      segmentGranularity = Granularity.HOUR, 
      windowPeriod = new Period("PT10M"), 
      partitions = 1, 
      replicants = 1 
     ) 
    ) 
     .timestampSpec(new TimestampSpec("timestamp", "posix", null)) 
     .buildBeam() 
関連する問題