Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Unreleased
* Add client operation correlation logging: `FunctionInvocationId` is now propagated via gRPC metadata to the host for client operations, enabling correlation with host logs.

## v1.6.2
* Fixing gRPC channel shutdown ([#249](https://github.com/microsoft/durabletask-java/pull/249))
Expand Down
10 changes: 10 additions & 0 deletions azurefunctions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ version = '1.6.2'
archivesBaseName = 'durabletask-azure-functions'

def protocVersion = '3.12.0'
def grpcVersion = '1.59.0'

repositories {
maven {
Expand All @@ -21,12 +22,21 @@ dependencies {
api project(':client')
implementation group: 'com.microsoft.azure.functions', name: 'azure-functions-java-library', version: '3.0.0'
implementation "com.google.protobuf:protobuf-java:${protocVersion}"
implementation "io.grpc:grpc-api:${grpcVersion}"
compileOnly "com.microsoft.azure.functions:azure-functions-java-spi:1.0.0"

testImplementation(platform('org.junit:junit-bom:5.7.2'))
testImplementation('org.junit.jupiter:junit-jupiter')
testImplementation('org.mockito:mockito-core:4.11.0')
}

sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8

test {
useJUnitPlatform()
}

publishing {
repositories {
maven {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import com.microsoft.durabletask.azurefunctions.internal.FunctionInvocationIdInterceptor;

import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
Expand All @@ -29,6 +30,7 @@ public class DurableClientContext {
private String taskHubName;
private String requiredQueryStringParameters;
private DurableTaskClient client;
private String functionInvocationId;

/**
* Gets the name of the client binding's task hub.
Expand All @@ -39,6 +41,18 @@ public String getTaskHubName() {
return this.taskHubName;
}

/**
* Sets the function invocation ID for correlation with host-side logs.
* <p>
* Call this method before calling {@link #getClient()} to enable correlation
* between client operations and host-side logs.
*
* @param invocationId the Azure Functions invocation ID
*/
public void setFunctionInvocationId(String invocationId) {
this.functionInvocationId = invocationId;
}

/**
* Gets the durable task client associated with the current function invocation.
*
Expand All @@ -56,7 +70,14 @@ public DurableTaskClient getClient() {
throw new IllegalStateException("The client context RPC base URL was invalid!", ex);
}

this.client = new DurableTaskGrpcClientBuilder().port(rpcURL.getPort()).build();
DurableTaskGrpcClientBuilder builder = new DurableTaskGrpcClientBuilder().port(rpcURL.getPort());

// Add interceptor for function invocation ID correlation if set
if (this.functionInvocationId != null && !this.functionInvocationId.isEmpty()) {
builder.addInterceptor(new FunctionInvocationIdInterceptor(this.functionInvocationId));
}

this.client = builder.build();
return this.client;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.durabletask.azurefunctions.internal;

import io.grpc.*;

/**
* A gRPC client interceptor that adds the Azure Functions invocation ID to outgoing calls
* for correlation with host-side logs.
*/
public final class FunctionInvocationIdInterceptor implements ClientInterceptor {
private static final String INVOCATION_ID_METADATA_KEY_NAME = "x-azure-functions-invocationid";
private static final Metadata.Key<String> INVOCATION_ID_KEY =
Metadata.Key.of(INVOCATION_ID_METADATA_KEY_NAME, Metadata.ASCII_STRING_MARSHALLER);

private final String invocationId;

/**
* Creates a new interceptor that will add the specified invocation ID to all gRPC calls.
*
* @param invocationId the Azure Functions invocation ID to add to calls
*/
public FunctionInvocationIdInterceptor(String invocationId) {
this.invocationId = invocationId;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {

return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
if (invocationId != null && !invocationId.isEmpty()) {
headers.put(INVOCATION_ID_KEY, invocationId);
}
super.start(responseListener, headers);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.durabletask.azurefunctions.internal;

import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;

/**
* Tests for FunctionInvocationIdInterceptor.
*/
public class FunctionInvocationIdInterceptorTests {

@Test
public void constructor_acceptsValidInvocationId() {
// Act & Assert - no exception should be thrown
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor("valid-id");
assertNotNull(interceptor);
}

@Test
public void constructor_acceptsNull() {
// Act & Assert - no exception should be thrown
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(null);
assertNotNull(interceptor);
}

@Test
public void constructor_acceptsEmptyString() {
// Act & Assert - no exception should be thrown
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor("");
assertNotNull(interceptor);
}

@Test
public void constructor_acceptsWhitespaceString() {
// Act & Assert - no exception should be thrown
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(" ");
assertNotNull(interceptor);
}

@Test
public void constructor_acceptsUuidFormat() {
// Act & Assert - no exception should be thrown with UUID format (common invocation ID format)
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor("550e8400-e29b-41d4-a716-446655440000");
assertNotNull(interceptor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
sidecarGrpcChannel = this.managedSidecarChannel;
}

// Apply any interceptors that were configured in the builder
List<ClientInterceptor> interceptors = builder.getInterceptors();
if (!interceptors.isEmpty()) {
sidecarGrpcChannel = ClientInterceptors.intercept(sidecarGrpcChannel, interceptors);
}

this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
package com.microsoft.durabletask;

import io.grpc.Channel;
import io.grpc.ClientInterceptor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process
Expand All @@ -13,6 +18,7 @@ public final class DurableTaskGrpcClientBuilder {
int port;
Channel channel;
String defaultVersion;
List<ClientInterceptor> interceptors = new ArrayList<>();

/**
* Sets the {@link DataConverter} to use for converting serializable data payloads.
Expand Down Expand Up @@ -65,6 +71,32 @@ public DurableTaskGrpcClientBuilder defaultVersion(String defaultVersion) {
return this;
}

/**
* Adds a gRPC client interceptor to be applied to all gRPC calls made by the client.
* <p>
* Interceptors can be used to add custom headers, logging, or other cross-cutting concerns
* to gRPC calls. Multiple interceptors can be added and will be applied in the order they
* were added.
*
* @param interceptor the gRPC client interceptor to add
* @return this builder object
*/
public DurableTaskGrpcClientBuilder addInterceptor(ClientInterceptor interceptor) {
if (interceptor != null) {
this.interceptors.add(interceptor);
}
return this;
}

/**
* Gets the list of interceptors that have been added to this builder.
*
* @return an unmodifiable list of interceptors
*/
List<ClientInterceptor> getInterceptors() {
return Collections.unmodifiableList(this.interceptors);
}

/**
* Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object.
* @return a new {@link DurableTaskClient} object
Expand Down
Loading