Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed streaming - better estimation for decision for streaming vs queuing #368

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

ohadbitt
Copy link
Collaborator

@ohadbitt ohadbitt commented Jun 30, 2024

Added

  • A new heuristic for choosing between queuing and streaming in Managed streaming client, the default behavior is using the
    an estimation against the 4mb limit after dividing or multiplying by some factor described by the consts:

// Regardless of the format, we don't want to stream more than 10mb
int MAX_STREAMING_STREAM_SIZE_BYTES = 10 * 1024 * 1024;
// Used against the users input of raw data size
int MAX_STREAMING_RAW_SIZE_BYTES = 6 * 1024 * 1024;
double JSON_UNCOMPRESSED_FACTOR = 1.5d;
int NON_BINARY_FACTOR = 2;
double BINARY_COMPRESSED_FACTOR = 2d;
double BINARY_UNCOMPRESSED_FACTOR = 1.5d;

This will also allow users to stream bigger than 4mb non-compressed data

  • disableRetries option to client options - default is true only for streaming clients as if stream is not repeatable it will simply fails - better to let the user deal with it (alternative considered was to always create repeatable streams - like in managed streaming)
  • better error messages

Copy link

github-actions bot commented Jun 30, 2024

Test Results

326 tests  +4   318 ✅ +4   3m 32s ⏱️ + 1m 0s
 27 suites +2     8 💤 ±0 
 27 files   +2     0 ❌ ±0 

Results for commit 92efb55. ± Comparison against base commit a00bd15.

♻️ This comment has been updated with latest results.

@AsafMah
Copy link
Contributor

AsafMah commented Jul 2, 2024

This is a very big PR, can you summarize the changes in more detail?

Also, for disableRetries, maybe we want to fold this in to a more robust Policy object like @yogilad suggested and worked on.

@@ -107,6 +107,7 @@
<ignoredUsedUndeclaredDependencies>
<dependency>com.microsoft.azure:msal4j:jar</dependency>
<dependency>io.projectreactor:reactor-core:jar</dependency>
<dependency>com.fasterxml.jackson.core:jackson-core:jar</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? don't we have problems with jackson in general?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just means we get it from Data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was sure we removed it as a dep because of blacklisting, maybe just the core part is ok? this is confusing me

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not sure we can have anything good here
but anyway Spark today releases a shaded Jar - i hope this will obsolete those problems

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was with one of our connectors, which blacklisted jackson. But maybe it was only jackson json and not jackson core. We'll see I guess


if (properties.isDisableRetries()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the http level, which I think is completely separate from what we do in streaming ingest.
Did you check what it actually does and when it runs? I think it does save the request, which will work for the "normal" streaming client

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well unless i missed it - it didnt - and thats why i use it here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe i should try and make it throw again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you?

CHANGELOG.md Outdated Show resolved Hide resolved
@@ -107,6 +107,7 @@
<ignoredUsedUndeclaredDependencies>
<dependency>com.microsoft.azure:msal4j:jar</dependency>
<dependency>io.projectreactor:reactor-core:jar</dependency>
<dependency>com.fasterxml.jackson.core:jackson-core:jar</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was sure we removed it as a dep because of blacklisting, maybe just the core part is ok? this is confusing me

this.rawSizeInBytes = rawSizeInBytes;
}

// An estimation of the raw (uncompressed, un-indexed) size of the data, for binary formatted files - use only if known
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not about v2 or not v2, it's about code organization - Both StreamSourceInfo and FileSourceInfo support Compression, but in Stream it's a part of the class and for file it's calculated outside of the class.

I think it should be part of the abstract class, and FileSourceInfo can define it to be the extension or whatever.

try {
return ingestFromStreamImpl(streamSourceInfo,
ingestionProperties);
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then why do you need it in the signature? Shouldn't it always be wrapped and then it's also not a breaking change

Copy link
Contributor

@AsafMah AsafMah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically done, just go over the rest of the comments. Most of them are nothing

@@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unknown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Unreleased] (look at the url for example)

@@ -107,6 +107,7 @@
<ignoredUsedUndeclaredDependencies>
<dependency>com.microsoft.azure:msal4j:jar</dependency>
<dependency>io.projectreactor:reactor-core:jar</dependency>
<dependency>com.fasterxml.jackson.core:jackson-core:jar</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was with one of our connectors, which blacklisted jackson. But maybe it was only jackson json and not jackson core. We'll see I guess


if (properties.isDisableRetries()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you?

@@ -73,7 +75,9 @@ public static StreamingClient createStreamingClient(ConnectionStringBuilder csb)
* @throws URISyntaxException if the cluster URL is invalid
*/
public static StreamingClient createStreamingClient(ConnectionStringBuilder csb, HttpClientProperties properties) throws URISyntaxException {
return new ClientImpl(csb, properties);
HttpClientProperties httpClientProperties = Optional.ofNullable(properties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That answers 1 (though I don't like it both places, but whatever)

But unlike the one from HttpClientFactory, you do disableRetries here, which I think should be commented

try {
return ingestFromStreamImpl(streamSourceInfo,
ingestionProperties);
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember now - I think what I meant is that you can catch and throw our exception inside the impl.

EIther way it probably doesn't matter if the impl isn't public

public static final String CLASS_NAME = ManagedStreamingIngestClient.class.getSimpleName();
final QueuedIngestClientImpl queuedIngestClient;
final StreamingIngestClient streamingIngestClient;
private final ExponentialRetry exponentialRetryTemplate;
private CloseableHttpClient httpClient = null;
private ManagedStreamingQueuingPolicy queuingPolicy = ManagedStreamingQueuingPolicy.Default;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean it's unlikely to grow if we're going to replace most of it for v2, but sure

* If the size of the stream is bigger than {@value MAX_STREAMING_SIZE_BYTES}, it will fall back to the queued streaming client.
* By default the policy for choosing a queued ingestion on the first try is the checking of weather the size of the estimated
* raw stream size (a conversion to compressed CSV) is bigger than 4MB, it will fall back to the queued streaming client.
* Use SourceInfo.size to override size estimations, alternatively - use setQueuingPolicy to override the predicate heuristics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. the name is still wrong (it's SourceInfo.ge/settRawSizeInBytes)
  2. I think just putting it right next to the previous line with this wording is confusing, since it makes it seem that they are related.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants