Skip to content

Commit 69c1150

Browse files
committed
feat(query): add streaming query with a timeunit
1 parent 738acd5 commit 69c1150

File tree

3 files changed

+125
-0
lines changed

3 files changed

+125
-0
lines changed

src/main/java/org/influxdb/InfluxDB.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,25 @@ public void write(final String database, final String retentionPolicy,
538538
public void query(Query query, int chunkSize, BiConsumer<Cancellable, QueryResult> onNext, Runnable onComplete,
539539
Consumer<Throwable> onFailure);
540540

541+
/**
542+
* Execute a streaming query against a database.
543+
*
544+
* @param query
545+
* the query to execute.
546+
* @param timeUnit
547+
* the time unit of the results.
548+
* @param chunkSize
549+
* the number of QueryResults to process in one chunk.
550+
* @param onNext
551+
* the consumer to invoke for each received QueryResult; with capability to discontinue a streaming query
552+
* @param onComplete
553+
* the onComplete to invoke for successfully end of stream
554+
* @param onFailure
555+
* the consumer for error handling
556+
*/
557+
public void query(Query query, TimeUnit timeUnit, int chunkSize, BiConsumer<Cancellable, QueryResult> onNext, Runnable onComplete,
558+
Consumer<Throwable> onFailure);
559+
541560
/**
542561
* Execute a query against a database.
543562
*

src/main/java/org/influxdb/impl/InfluxDBImpl.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,90 @@ public void onFailure(final Call<ResponseBody> call, final Throwable t) {
713713
});
714714
}
715715

716+
/**
717+
* {@inheritDoc}
718+
*/
719+
@Override
720+
public void query(final Query query, final TimeUnit timeUnit, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,
721+
final Runnable onComplete, final Consumer<Throwable> onFailure) {
722+
Call<ResponseBody> call;
723+
if (query.hasBoundParameters()) {
724+
if (query.requiresPost()) {
725+
call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize,
726+
query.getParameterJsonWithUrlEncoded());
727+
} else {
728+
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize,
729+
query.getParameterJsonWithUrlEncoded());
730+
}
731+
} else {
732+
if (query.requiresPost()) {
733+
call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize);
734+
} else {
735+
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize);
736+
}
737+
}
738+
739+
call.enqueue(new Callback<ResponseBody>() {
740+
@Override
741+
public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {
742+
743+
Cancellable cancellable = new Cancellable() {
744+
@Override
745+
public void cancel() {
746+
call.cancel();
747+
}
748+
749+
@Override
750+
public boolean isCanceled() {
751+
return call.isCanceled();
752+
}
753+
};
754+
755+
try {
756+
if (response.isSuccessful()) {
757+
ResponseBody chunkedBody = response.body();
758+
chunkProccesor.process(chunkedBody, cancellable, onNext, onComplete);
759+
} else {
760+
// REVIEW: must be handled consistently with IOException.
761+
ResponseBody errorBody = response.errorBody();
762+
if (errorBody != null) {
763+
InfluxDBException influxDBException = new InfluxDBException(errorBody.string());
764+
if (onFailure == null) {
765+
throw influxDBException;
766+
} else {
767+
onFailure.accept(influxDBException);
768+
}
769+
}
770+
}
771+
} catch (IOException e) {
772+
QueryResult queryResult = new QueryResult();
773+
queryResult.setError(e.toString());
774+
onNext.accept(cancellable, queryResult);
775+
//passing null onFailure consumer is here for backward compatibility
776+
//where the empty queryResult containing error is propagating into onNext consumer
777+
if (onFailure != null) {
778+
onFailure.accept(e);
779+
}
780+
} catch (Exception e) {
781+
call.cancel();
782+
if (onFailure != null) {
783+
onFailure.accept(e);
784+
}
785+
}
786+
787+
}
788+
789+
@Override
790+
public void onFailure(final Call<ResponseBody> call, final Throwable t) {
791+
if (onFailure == null) {
792+
throw new InfluxDBException(t);
793+
} else {
794+
onFailure.accept(t);
795+
}
796+
}
797+
});
798+
}
799+
716800
/**
717801
* {@inheritDoc}
718802
*/

src/main/java/org/influxdb/impl/InfluxDBService.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,24 @@ public Call<QueryResult> postQuery(@Query(DB) String db, @Query(EPOCH) String ep
7575
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query,
7676
@Query(CHUNK_SIZE) int chunkSize);
7777

78+
@Streaming
79+
@POST("query?chunked=true")
80+
@FormUrlEncoded
81+
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
82+
@Query(CHUNK_SIZE) int chunkSize);
83+
7884
@Streaming
7985
@POST("query?chunked=true")
8086
@FormUrlEncoded
8187
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query,
8288
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
8389

90+
@Streaming
91+
@POST("query?chunked=true")
92+
@FormUrlEncoded
93+
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
94+
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
95+
8496
@POST("query")
8597
@FormUrlEncoded
8698
public Call<QueryResult> postQuery(@Field(value = Q, encoded = true) String query);
@@ -90,8 +102,18 @@ public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, enco
90102
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
91103
@Query(CHUNK_SIZE) int chunkSize);
92104

105+
@Streaming
106+
@GET("query?chunked=true")
107+
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
108+
@Query(CHUNK_SIZE) int chunkSize);
109+
93110
@Streaming
94111
@GET("query?chunked=true")
95112
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
96113
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
114+
115+
@Streaming
116+
@GET("query?chunked=true")
117+
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
118+
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
97119
}

0 commit comments

Comments
 (0)