Skip to content

Commit

Permalink
User/t eitanmoed/distributed tracing (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
enmoed committed Jun 26, 2023
1 parent e908bff commit fae7704
Show file tree
Hide file tree
Showing 36 changed files with 1,037 additions and 71 deletions.
48 changes: 41 additions & 7 deletions data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.KustoClientInvalidConnectionStringException;
import com.microsoft.azure.kusto.data.exceptions.KustoServiceQueryError;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.data.instrumentation.SupplierTwoExceptions;
import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.ParseException;
Expand All @@ -23,6 +26,7 @@
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -106,9 +110,28 @@ public KustoOperationResult execute(String database, String command) throws Data

@Override
public KustoOperationResult execute(String database, String command, ClientRequestProperties properties) throws DataServiceException, DataClientException {
String response = executeToJsonResult(database, command, properties);

CommandType commandType = determineCommandType(command);
return MonitoredActivity.invoke(
(SupplierTwoExceptions<KustoOperationResult, DataServiceException, DataClientException>) () -> executeImpl(database, command, properties,
commandType),
commandType.getActivityTypeSuffix().concat(".execute"),
updateAndGetExecuteTracingAttributes(database, properties));
}

private Map<String, String> updateAndGetExecuteTracingAttributes(String database, TraceableAttributes traceableAttributes) {
Map<String, String> attributes = new HashMap<>();
attributes.put("cluster", clusterUrl);
attributes.put("database", database);
if (traceableAttributes != null) {
attributes.putAll(traceableAttributes.getTracingAttributes());
}
return attributes;
}

@NotNull
private KustoOperationResult executeImpl(String database, String command, ClientRequestProperties properties, CommandType commandType)
throws DataServiceException, DataClientException {
String response = executeToJsonResult(database, command, properties);
String clusterEndpoint = String.format(commandType.getEndpoint(), clusterUrl);
try {
return new KustoOperationResult(response, clusterEndpoint.endsWith("v2/rest/query") ? "v2" : "v1");
Expand Down Expand Up @@ -154,11 +177,14 @@ public String executeToJsonResult(String database, String command, ClientRequest
} catch (KustoClientInvalidConnectionStringException e) {
throw new DataClientException(clusterUrl, e.getMessage(), e);
}

addCommandHeaders(headers);
String jsonPayload = generateCommandPayload(database, command, properties);
StringEntity requestEntity = new StringEntity(jsonPayload, ContentType.APPLICATION_JSON);
return Utils.post(httpClient, clusterEndpoint, requestEntity, timeoutMs + CLIENT_SERVER_DELTA_IN_MILLISECS, headers);
// trace execution
return MonitoredActivity.invoke(
(SupplierTwoExceptions<String, DataServiceException, DataClientException>) () -> Utils.post(httpClient, clusterEndpoint, requestEntity,
timeoutMs + CLIENT_SERVER_DELTA_IN_MILLISECS, headers),
commandType.getActivityTypeSuffix().concat(".executeToJsonResult"));
}

private void validateEndpoint() throws DataServiceException, KustoClientInvalidConnectionStringException {
Expand Down Expand Up @@ -212,7 +238,12 @@ private KustoOperationResult executeStreamingIngestImpl(String clusterEndpoint,
// We use UncloseableStream to prevent HttpClient From closing it
AbstractHttpEntity entity = isStreamSource ? new InputStreamEntity(new UncloseableStream(stream))
: new StringEntity(new IngestionSourceStorage(blobUrl).toString(), ContentType.APPLICATION_JSON);
String response = Utils.post(httpClient, clusterEndpoint, entity, timeoutMs + CLIENT_SERVER_DELTA_IN_MILLISECS, headers);
String response;
// trace executeStreamingIngest
response = MonitoredActivity.invoke(
(SupplierTwoExceptions<String, DataServiceException, DataClientException>) () -> Utils.post(httpClient, clusterEndpoint, entity,
timeoutMs + CLIENT_SERVER_DELTA_IN_MILLISECS, headers),
"ClientImpl.executeStreamingIngest");
return new KustoOperationResult(response, "v1");
} catch (KustoServiceQueryError e) {
throw new DataClientException(clusterEndpoint, "Error converting json response to KustoOperationResult:" + e.getMessage(), e);
Expand Down Expand Up @@ -292,8 +323,11 @@ public InputStream executeStreamingQuery(String database, String command, Client
} catch (KustoClientInvalidConnectionStringException e) {
throw new DataClientException(clusterUrl, e.getMessage(), e);
}

return Utils.postToStreamingOutput(httpClient, clusterEndpoint, jsonPayload, timeoutMs + CLIENT_SERVER_DELTA_IN_MILLISECS, headers);
// trace httpCall
return MonitoredActivity.invoke(
(SupplierTwoExceptions<InputStream, DataServiceException, DataClientException>) () -> Utils.postToStreamingOutput(httpClient, clusterEndpoint,
jsonPayload, timeoutMs + CLIENT_SERVER_DELTA_IN_MILLISECS, headers),
"ClientImpl.executeStreamingQuery", updateAndGetExecuteTracingAttributes(database, properties));
}

private long determineTimeout(ClientRequestProperties properties, CommandType commandType, String clusterUrl) throws DataClientException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import com.microsoft.azure.kusto.data.format.CslRealFormat;
import com.microsoft.azure.kusto.data.format.CslTimespanFormat;
import com.microsoft.azure.kusto.data.format.CslUuidFormat;
import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.ParseException;
import org.jetbrains.annotations.NotNull;

import java.io.Serializable;
import java.time.Duration;
Expand All @@ -36,7 +38,7 @@
* For a complete list of available client request properties
* check out https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties#list-of-clientrequestproperties
*/
public class ClientRequestProperties implements Serializable {
public class ClientRequestProperties implements Serializable, TraceableAttributes {
public static final String OPTION_SERVER_TIMEOUT = "servertimeout";
/*
* Matches valid Kusto Timespans: Optionally negative, optional number of days followed by a period, optionally up to 24 as hours followed by a colon,
Expand Down Expand Up @@ -287,6 +289,12 @@ Iterator<HashMap.Entry<String, Object>> getOptions() {
return options.entrySet().iterator();
}

public Map<String, String> getTracingAttributes() {
Map<String, String> attributes = new HashMap<>();
attributes.put("clientRequestId", getClientRequestId());
return attributes;
}

String getTimeoutAsString(Object timeoutObj) {
String timeoutString = "";
if (timeoutObj instanceof Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.microsoft.azure.kusto.data.exceptions.JsonPropertyMissingException;
import com.microsoft.azure.kusto.data.exceptions.KustoServiceQueryError;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.data.instrumentation.SupplierOneException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -37,12 +39,19 @@ public class KustoOperationResult implements Iterator<KustoResultSetTable> {
private final ObjectMapper objectMapper = Utils.getObjectMapper();

public KustoOperationResult(String response, String version) throws KustoServiceQueryError {
MonitoredActivity.invoke((SupplierOneException<Void, KustoServiceQueryError>) () -> {
KustoOperationResultImpl(response, version);
return null;
}, "KustoOperationResult.createFromResponse");
it = resultTables.iterator();
}

private void KustoOperationResultImpl(String response, String version) throws KustoServiceQueryError {
if (version.contains("v2")) {
createFromV2Response(response);
} else {
createFromV1Response(response);
}
it = resultTables.iterator();
}

public List<KustoResultSetTable> getResultTables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.jetbrains.annotations.NotNull;

public class AccessTokenTokenProvider extends TokenProviderBase {
public static final String ACCESS_TOKEN_TOKEN_PROVIDER = "AccessTokenTokenProvider";
private final String accessToken;

AccessTokenTokenProvider(@NotNull String clusterUrl, @NotNull String accessToken) throws URISyntaxException {
Expand All @@ -16,7 +17,12 @@ public class AccessTokenTokenProvider extends TokenProviderBase {
}

@Override
public String acquireAccessToken() {
protected String acquireAccessTokenImpl() {
return accessToken;
}

@Override
protected String getAuthMethod() {
return ACCESS_TOKEN_TOKEN_PROVIDER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.jetbrains.annotations.Nullable;

public class ApplicationCertificateTokenProvider extends ConfidentialAppTokenProviderBase {
public static final String APPLICATION_CERTIFICATE_TOKEN_PROVIDER = "ApplicationCertificateTokenProvider";
private final IClientCertificate clientCertificate;

ApplicationCertificateTokenProvider(@NotNull String clusterUrl, @NotNull String applicationClientId, @NotNull IClientCertificate clientCertificate,
Expand All @@ -33,4 +34,9 @@ protected IConfidentialClientApplication getClientApplication() throws Malformed
return clientApplication = builder
.build();
}

@Override
protected String getAuthMethod() {
return APPLICATION_CERTIFICATE_TOKEN_PROVIDER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

public class ApplicationKeyTokenProvider extends ConfidentialAppTokenProviderBase {
private final IClientSecret clientSecret;
public static final String APPLICATION_KEY_TOKEN_PROVIDER = "ApplicationKeyTokenProvider";

ApplicationKeyTokenProvider(@NotNull String clusterUrl, @NotNull String applicationClientId, @NotNull IClientSecret clientSecret,
String authorityId, @Nullable HttpClient httpClient) throws URISyntaxException {
Expand All @@ -31,4 +32,9 @@ protected IConfidentialClientApplication getClientApplication() throws Malformed
}
return authority.build();
}

@Override
protected String getAuthMethod() {
return APPLICATION_KEY_TOKEN_PROVIDER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.Callable;

public class CallbackTokenProvider extends TokenProviderBase {
public static final String CALLBACK_TOKEN_PROVIDER = "CallbackTokenProvider";
private final CallbackTokenProviderFunction tokenProvider;

CallbackTokenProvider(@NotNull String clusterUrl, @NotNull Callable<String> tokenProvider) throws URISyntaxException {
Expand All @@ -27,11 +28,16 @@ public class CallbackTokenProvider extends TokenProviderBase {
}

@Override
public String acquireAccessToken() throws DataClientException {
protected String acquireAccessTokenImpl() throws DataClientException {
try {
return tokenProvider.apply(httpClient);
} catch (Exception e) {
throw new DataClientException(clusterUrl, e.getMessage(), e);
}
}

@Override
protected String getAuthMethod() {
return CALLBACK_TOKEN_PROVIDER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@

package com.microsoft.azure.kusto.data.auth;

import com.microsoft.azure.kusto.data.instrumentation.SupplierTwoExceptions;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import org.apache.http.client.HttpClient;

import org.jetbrains.annotations.NotNull;
Expand All @@ -18,17 +22,23 @@ public abstract class CloudDependentTokenProviderBase extends TokenProviderBase
private static final String ERROR_INVALID_SERVICE_RESOURCE_URL = "Error determining scope due to invalid Kusto Service Resource URL";
protected final Set<String> scopes = new HashSet<>();
private boolean initialized = false;
private CloudInfo cloudInfo;

CloudDependentTokenProviderBase(@NotNull String clusterUrl, @Nullable HttpClient httpClient) throws URISyntaxException {
super(clusterUrl, httpClient);
}

@Override
synchronized void initialize() throws DataClientException, DataServiceException {
if (initialized) {
return;
}

initializeWithCloudInfo(CloudInfo.retrieveCloudInfoForCluster(clusterUrl, httpClient));
// trace retrieveCloudInfo
cloudInfo = MonitoredActivity.invoke(
(SupplierTwoExceptions<CloudInfo, DataClientException, DataServiceException>) () -> CloudInfo.retrieveCloudInfoForCluster(clusterUrl,
httpClient),
"CloudDependentTokenProviderBase.retrieveCloudInfo", getTracingAttributes());
initializeWithCloudInfo(cloudInfo);
initialized = true;
}

Expand All @@ -41,10 +51,12 @@ protected void initializeWithCloudInfo(CloudInfo cloudInfo) throws DataClientExc
}

@Override
public String acquireAccessToken() throws DataServiceException, DataClientException {
initialize();
return acquireAccessTokenImpl();
public Map<String, String> getTracingAttributes() {
Map<String, String> attributes = super.getTracingAttributes();
if (cloudInfo != null) {
attributes.putAll(cloudInfo.getTracingAttributes());
}
attributes.put("http.url", clusterUrl);
return attributes;
}

protected abstract String acquireAccessTokenImpl() throws DataServiceException, DataClientException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.kusto.data.HttpClientFactory;
import com.microsoft.azure.kusto.data.instrumentation.SupplierOneException;
import com.microsoft.azure.kusto.data.UriUtils;
import com.microsoft.azure.kusto.data.Utils;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;

import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.util.EntityUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.Closeable;
Expand All @@ -23,7 +26,7 @@
import java.util.Map;
import java.util.Objects;

public class CloudInfo {
public class CloudInfo implements TraceableAttributes {
private static final Map<String, CloudInfo> cache = new HashMap<>();

public static final String METADATA_ENDPOINT = "v1/rest/auth/metadata";
Expand Down Expand Up @@ -95,7 +98,10 @@ public static CloudInfo retrieveCloudInfoForCluster(String clusterUrl,
HttpGet request = new HttpGet(UriUtils.appendPathToUri(clusterUrl, METADATA_ENDPOINT));
request.addHeader(HttpHeaders.ACCEPT_ENCODING, "gzip,deflate");
request.addHeader(HttpHeaders.ACCEPT, "application/json");
HttpResponse response = localHttpClient.execute(request);

// trace CloudInfo.httpCall
HttpResponse response = MonitoredActivity.invoke((SupplierOneException<HttpResponse, IOException>) () -> localHttpClient.execute(request),
"CloudInfo.httpCall");
try {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200) {
Expand Down Expand Up @@ -190,6 +196,13 @@ public String getKustoServiceResourceId() {
return kustoServiceResourceId;
}

@Override
public Map<String, String> getTracingAttributes() {
Map<String, String> attributes = new HashMap<>();
attributes.put("resource", kustoServiceResourceId);
return attributes;
}

public String getFirstPartyAuthorityUrl() {
return firstPartyAuthorityUrl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import java.util.function.Consumer;

public class DeviceAuthTokenProvider extends PublicAppTokenProviderBase {

public static final String DEVICE_AUTH_TOKEN_PROVIDER = "DeviceAuthTokenProvider";

public DeviceAuthTokenProvider(@NotNull String clusterUrl, String authorityId, @Nullable HttpClient httpClient) throws URISyntaxException {
super(clusterUrl, authorityId, httpClient);
}
Expand All @@ -25,4 +28,9 @@ protected IAuthenticationResult acquireNewAccessToken() {
DeviceCodeFlowParameters deviceCodeFlowParams = DeviceCodeFlowParameters.builder(scopes, deviceCodeConsumer).build();
return clientApplication.acquireToken(deviceCodeFlowParams).join();
}

@Override
protected String getAuthMethod() {
return DEVICE_AUTH_TOKEN_PROVIDER;
}
}
Loading

0 comments on commit fae7704

Please sign in to comment.