2017-09-14 10 views
1

私はApache Cassandra 3.x.xについて学んでいます。私はいくつかのものを開発しようとしています。CassandraのUnixTimestampをTIMEUUIDに変換する

id (UUID - Primary Key) | Message (TEXT) | REQ_Timestamp (TIMEUUID) | Now_Timestamp (TIMEUUID) 

REQ_Timestampメッセージがフロントエンドのレベルでクライアントを残された時間があります。問題は、私はこれらの列が含まれているカサンドラのテーブルにデータを格納したいということです。一方、Now_Timestampは、メッセージが最終的にCassandraに保存される時刻です。私は両方のタイムスタンプが必要です。なぜなら、データが安全に保存されるまで、その起点からの要求を処理するのにかかる時間を測定したいからです。

Now_Timestampを作成するのは簡単ですが、now()関数を使用するだけで自動的にTIMEUUIDが生成されます。この問題はREQ_Timestampで発生します。そのUnixタイムスタンプをTIMEUUIDに変換してCassandraに保存させるにはどうすればいいですか?これも可能ですか?

私のバックエンドのアーキテクチャは次のとおりです。フロントエンドからJSONのデータを処理し、それをKafkaに格納するWebサービスにデータを取得します。その後、Spark Streamingの仕事がそのカフカのログを取り、それをカサンドラに置きます。

これは私のWebサービスで、データをカフカに入れます。

@Path("/") 
public class MemoIn { 

    @POST 
    @Path("/in") 
    @Consumes(MediaType.APPLICATION_JSON) 
    @Produces(MediaType.TEXT_PLAIN) 
    public Response goInKafka(InputStream incomingData){ 
     StringBuilder bld = new StringBuilder(); 
     try { 
      BufferedReader in = new BufferedReader(new InputStreamReader(incomingData)); 
      String line = null; 
      while ((line = in.readLine()) != null) { 
       bld.append(line); 
      } 
     } catch (Exception e) { 
      System.out.println("Error Parsing: - "); 
     } 
     System.out.println("Data Received: " + bld.toString()); 

     JSONObject obj = new JSONObject(bld.toString()); 
     String line = obj.getString("id_memo") + "|" + obj.getString("id_writer") + 
           "|" + obj.getString("id_diseased") 
           + "|" + obj.getString("memo") + "|" + obj.getLong("req_timestamp"); 

     try { 
      KafkaLogWriter.addToLog(line); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     return Response.status(200).entity(line).build(); 
    } 


} 

は、ここに私のカフカライター

package main.java.vcemetery.webservice; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import java.util.Properties; 
import org.apache.kafka.clients.producer.Producer; 

public class KafkaLogWriter { 

    public static void addToLog(String memo)throws Exception { 
     // private static Scanner in; 
      String topicName = "MemosLog"; 

      /* 
      First, we set the properties of the Kafka Log 
      */ 
      Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("acks", "all"); 
      props.put("retries", 0); 
      props.put("batch.size", 16384); 
      props.put("linger.ms", 1); 
      props.put("buffer.memory", 33554432); 
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

      // We create the producer 
      Producer<String, String> producer = new KafkaProducer<>(props); 
      // We send the line into the producer 
      producer.send(new ProducerRecord<>(topicName, memo)); 
      // We close the producer 
      producer.close(); 

    } 
} 

だと最終的にここに私が

public class MemoStream { 

    public static void main(String[] args) throws Exception { 
     Logger.getLogger("org").setLevel(Level.ERROR); 
     Logger.getLogger("akka").setLevel(Level.ERROR); 

     // Create the context with a 1 second batch size 
     SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkExample").setMaster("local[2]"); 
     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); 

     Map<String, Object> kafkaParams = new HashMap<>(); 
     kafkaParams.put("bootstrap.servers", "localhost:9092"); 
     kafkaParams.put("key.deserializer", StringDeserializer.class); 
     kafkaParams.put("value.deserializer", StringDeserializer.class); 
     kafkaParams.put("group.id", "group1"); 
     kafkaParams.put("auto.offset.reset", "latest"); 
     kafkaParams.put("enable.auto.commit", false); 

     /* Se crea un array con los tópicos a consultar, en este caso solamente un tópico */ 
     Collection<String> topics = Arrays.asList("MemosLog"); 

     final JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = 
       KafkaUtils.createDirectStream(
         ssc, 
         LocationStrategies.PreferConsistent(), 
         ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
       ); 

     kafkaStream.mapToPair(record -> new Tuple2<>(record.key(), record.value())); 
     // Split each bucket of kafka data into memos a splitable stream 
     JavaDStream<String> stream = kafkaStream.map(record -> (record.value().toString())); 
     // Then, we split each stream into lines or memos 
     JavaDStream<String> memos = stream.flatMap(x -> Arrays.asList(x.split("\n")).iterator()); 
     /* 
     To split each memo into sections of ids and messages, we have to use the code \\ plus the character 
      */ 
     JavaDStream<String> sections = memos.flatMap(y -> Arrays.asList(y.split("\\|")).iterator()); 
     sections.print(); 
     sections.foreachRDD(rdd -> { 
      rdd.foreachPartition(partitionOfRecords -> { 
       //We establish the connection with Cassandra 
       Cluster cluster = null; 
       try { 
        cluster = Cluster.builder() 
          .withClusterName("VCemeteryMemos") // ClusterName 
          .addContactPoint("127.0.0.1") // Host IP 
          .build(); 

       } finally { 
        if (cluster != null) cluster.close(); 
       } 
       while(partitionOfRecords.hasNext()){ 


       } 
      }); 
     }); 

     ssc.start(); 
     ssc.awaitTermination(); 

    } 
} 

は、事前にありがとう私のスパークストリーミングジョブの持っているものです。

答えて

1

カサンドラには変換機能がありませんから UNIXタイムスタンプ。あなたはクライアント側で変換を行う必要があります。

参考:https://docs.datastax.com/en/cql/3.3/cql/cql_reference/timeuuid_functions_r.html

+0

私が質問をしたときに、私は実際に同じ文書を参照しました。どのようにクライアント側で変換を行う上で任意のアイデアですか?私はここで立ち往生している。 –

+0

使用するクライアントによって異なります。それはdatastax javadriverですか?おそらく、あなたが行っていることのためにあなたのコードのいくつかを示すことができます。 –

+0

私のバックエンドのアーキテクチャは次のとおりです。フロントエンドからJSONのデータを処理し、それをKafkaに格納するWebサービスにデータを取得します。その後、Spark Streamingの仕事がそのカフカのログを取り、それをカサンドラに置きます。私はWebService/Kafkaコードとこれまでに書いたSparkコードでオリジナルの投稿を編集します。 –

関連する問題