Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,23 @@ public static void main(String[] args) throws Exception {
@VisibleForTesting
public static void main(Function<String, String> environmentVarGetter) throws Exception {
JvmInitializers.runOnStartup();
System.out.format("SDK Fn Harness started%n");
System.out.format("Harness ID %s%n", environmentVarGetter.apply(HARNESS_ID));
System.out.format(
"Logging location %s%n", environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
System.out.format(
"Control location %s%n", environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
System.out.format(
"Status location %s%n", environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));

Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
? null
: getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
String id = environmentVarGetter.apply(HARNESS_ID);

System.out.format("SDK Fn Harness started%n");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand these System.out were due to that logging not initialized yet. However I also remember raw stdout could be tagged as ERROR level logging in Dataflow cloud logging (also possible in other runners). Consider removing potential log spam

System.out.format("Harness ID %s%n", id);
System.out.format("Logging location %s%n", loggingApiServiceDescriptor);
System.out.format("Control location %s%n", controlApiServiceDescriptor);
System.out.format("Status location %s%n", statusApiServiceDescriptor);

String pipelineOptionsJson = environmentVarGetter.apply(PIPELINE_OPTIONS);
// Try looking for a file first. If that exists it should override PIPELINE_OPTIONS to avoid
// maxing out the kernel's environment space
Expand All @@ -179,16 +186,6 @@ public static void main(Function<String, String> environmentVarGetter) throws Ex

PipelineOptions options = PipelineOptionsTranslation.fromJson(pipelineOptionsJson);

Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));

Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));

Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
? null
: getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
String runnerCapabilitesOrNull = environmentVarGetter.apply(RUNNER_CAPABILITIES);
Set<String> runnerCapabilites =
runnerCapabilitesOrNull == null
Expand Down Expand Up @@ -415,7 +412,7 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
// directExecutor() when building the channel.
BeamFnControlClient control =
new BeamFnControlClient(
controlStub.withExecutor(MoreExecutors.directExecutor()),
controlStub.withExecutor(MoreExecutors.directExecutor()).withWaitForReady(),
outboundObserverFactory,
executorService,
handlers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ public StreamWriter(ManagedChannel channel) {
this.streamPhaser = new AdvancingPhaser(1);
this.channel = channel;

BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel);
BeamFnLoggingGrpc.BeamFnLoggingStub stub =
BeamFnLoggingGrpc.newStub(channel).withWaitForReady();
this.inboundObserver = new LogControlObserver();
this.outboundObserver =
new DirectStreamObserver<BeamFnApi.LogEntry.List>(
Expand Down
Loading