2016-04-10 8 views
1

csv、tsvなどからローをインポートするためのFirehoseを持っているデフォルトの例とは異なり、データベースからレコードをインポートしてドルイドに挿入できるようにしますか?何かご意見は?ここでDruid:データベースからレコードをインポートするFirehose

は私が考えていたものです -

"firehose": { 
    "type" : "database", 
     "datasource" : { 
       "connectURI" : "jdbc:mysql://localhost:3306/test", 
       "user" : "druid", 
       "password" : "xyz123" 
     }, 
     "query" : "select * from table" 
     "frequency" : "P1M" 
} 

我々はおそらくJNDIデータソースといくつかの他経由の接続を得るためにそれを拡張することができます。この種の実装に問題はありますか?

+0

CSVファイルにデータベースからエクスポートするためにシンプルにして、通常の方法を使用することができますか? – Nikem

+0

もちろん、データのサイズが問題です。 – jagamot

答えて

0
 
How about this idea? It's custom firehose for jdbc ingestion. 
In this case, only supports one time query ingestion. 
https://github.com/sirpkt/druid/tree/jdbc_firehose/extensions-contrib/jdbc-firehose 
This is code snippet. Using DBI library try to get result set from existing database server. 
public Firehose connect(final MapInputRowParser parser) throws IOException, ParseException, IllegalArgumentException 
    { 
    if (columns != null) { 
     verifyParserSpec(parser.getParseSpec(), columns); 
    } 

    final Handle handle = new DBI(
     connectorConfig.getConnectURI(), 
     connectorConfig.getUser(), 
     connectorConfig.getPassword() 
    ).open(); 

    final String query = makeQuery(columns); 

    final ResultIterator<InputRow> rowIterator = handle 
     .createQuery(query) 
     .map(
      new ResultSetMapper<InputRow>() 
      { 
       List<String> queryColumns = (columns == null) ? Lists.<String>newArrayList(): columns; 

       @Override 
       public InputRow map(
        final int index, 
        final ResultSet r, 
        final StatementContext ctx 
      ) throws SQLException 
       { 
       try { 
        if (queryColumns.size() == 0) 
        { 
        ResultSetMetaData metadata = r.getMetaData(); 
        for (int idx = 1; idx <= metadata.getColumnCount(); idx++) 
        { 
         queryColumns.add(metadata.getColumnName(idx)); 
        } 
        Preconditions.checkArgument(queryColumns.size() > 0, 
         String.format("No column in table [%s]", table)); 
        verifyParserSpec(parser.getParseSpec(), queryColumns); 
        } 
        ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder(); 
        for (String column: queryColumns) { 
        builder.put(column, r.getObject(column)); 
        } 
        return parser.parse(builder.build()); 

       } catch(IllegalArgumentException e) { 
        throw new SQLException(e); 
       } 
       } 
      } 
     ).iterator(); 
関連する問題