-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes needed for scaling up and running in Terra #1
Changes from 64 commits
14787c3
df817ed
2747ede
c7fe811
6c3284f
9a1f397
1ae8fdb
bd917fc
bb044e0
e2c8ffc
4123788
6064433
21ee544
1f04a73
7856d35
4b8493c
3b2e931
97208f8
73d2ce6
dce810c
2783557
919917b
41b7457
c89afd7
e3534d4
6f01587
c028c44
02d05f1
4e06990
2c60b6a
68f7c55
0221889
d16f30b
1bffd60
36af337
68ca3cb
ca20960
6cdd5e3
260c716
bd56331
656f322
7138c39
fb41381
0e421ab
beb8d38
c9e03fd
5e624f2
fc6ea5b
d65996e
0598f85
e57985d
35907bc
cd86f4c
b44c1e8
dd23a36
54627e0
75e752a
f6439a0
063aa7d
a1fba0d
15577f9
db3dc8d
6baf3ca
ccbc049
ee0dade
70498d9
24d25eb
b61e2d0
0a7dea9
0437928
f5ab3d0
9c7e85e
fd8790a
d72ab3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,8 +12,12 @@ workflow JointGenotyping { | |
File unpadded_intervals_file | ||
|
||
String callset_name | ||
#TODO: make sample_name_map from the gvcf_paths? | ||
File sample_name_map | ||
|
||
File gvcf_paths_fofn | ||
File gvcf_path_indexes_fofn | ||
|
||
File ref_fasta | ||
File ref_fasta_index | ||
File ref_dict | ||
|
@@ -63,7 +67,8 @@ workflow JointGenotyping { | |
Boolean use_gnarly_genotyper = false | ||
Boolean use_allele_specific_annotations = true | ||
Boolean cross_check_fingerprints = true | ||
Boolean scatter_cross_check_fingerprints = false | ||
# If cross check fingerprints should be scattered, how many gvcfs per shard? Typically set to 1000. | ||
Int? cross_check_fingerprint_scatter_partition | ||
} | ||
|
||
Boolean allele_specific_annotations = !use_gnarly_genotyper && use_allele_specific_annotations | ||
|
@@ -73,8 +78,8 @@ workflow JointGenotyping { | |
|
||
Array[Array[String]] sample_name_map_lines_t = transpose(sample_name_map_lines) | ||
Array[String] sample_names_from_map = sample_name_map_lines_t[0] | ||
Array[File] gvcf_paths_from_map = sample_name_map_lines_t[1] | ||
Array[File] gvcf_index_paths_from_map = sample_name_map_lines_t[2] | ||
#Array[File] gvcf_paths_from_map = sample_name_map_lines_t[1] | ||
#Array[File] gvcf_index_paths_from_map = sample_name_map_lines_t[2] | ||
|
||
# Make a 2.5:1 interval number to samples in callset ratio interval list. | ||
# We allow overriding the behavior by specifying the desired number of vcfs | ||
|
@@ -92,8 +97,8 @@ workflow JointGenotyping { | |
|
||
#call Tasks.CheckSamplesUnique { | ||
# input: | ||
# sample_name_map = sample_name_map, | ||
# sample_num_threshold = 10 | ||
# sample_name_map = sample_name_map_for_fingerprinting, | ||
# sample_num_threshold = 1 | ||
#} | ||
|
||
call Tasks.SplitIntervalList { | ||
|
@@ -107,6 +112,27 @@ workflow JointGenotyping { | |
sample_names_unique_done = true | ||
} | ||
|
||
call Tasks.SplitFofn as SplitGvcfFofn { | ||
input: | ||
largeFofn = gvcf_paths_fofn | ||
} | ||
|
||
call Tasks.SplitFofn as SplitGvcfIndexFofn { | ||
input: | ||
largeFofn = gvcf_path_indexes_fofn | ||
} | ||
|
||
scatter (i in range(length(SplitGvcfFofn.tiny_fofns))) { | ||
Array[File] gvcf_path_arrays = read_lines(SplitGvcfFofn.tiny_fofns[i]) | ||
Array[File] gvcf_index_path_arrays = read_lines(SplitGvcfIndexFofn.tiny_fofns[i]) | ||
} | ||
|
||
Array[File] gvcf_paths = flatten(gvcf_path_arrays) | ||
Array[File] gvcf_path_indexes = flatten(gvcf_index_path_arrays) | ||
|
||
File header_vcf = gvcf_paths[0] | ||
File header_vcf_index = gvcf_path_indexes[0] | ||
|
||
Array[File] unpadded_intervals = SplitIntervalList.output_intervals | ||
|
||
scatter (idx in range(length(unpadded_intervals))) { | ||
|
@@ -117,9 +143,10 @@ workflow JointGenotyping { | |
# the Hellbender (GATK engine) team! | ||
call Tasks.ImportGVCFs { | ||
input: | ||
sample_names = sample_names_from_map, | ||
gvcf_files = gvcf_paths_from_map, | ||
gvcf_index_files = gvcf_index_paths_from_map, | ||
sample_name_map = sample_name_map, | ||
# need to provide an example header in order to stream from azure, so use the first gvcf | ||
header_vcf = header_vcf, | ||
header_vcf_index = header_vcf_index, | ||
interval = unpadded_intervals[idx], | ||
ref_fasta = ref_fasta, | ||
ref_fasta_index = ref_fasta_index, | ||
|
@@ -153,15 +180,13 @@ workflow JointGenotyping { | |
ref_fasta = ref_fasta, | ||
ref_fasta_index = ref_fasta_index, | ||
ref_dict = ref_dict, | ||
dbsnp_vcf = dbsnp_vcf, | ||
dbsnp_vcf = dbsnp_vcf | ||
} | ||
} | ||
|
||
Array[File] gnarly_gvcfs = GnarlyGenotyper.output_vcf | ||
|
||
call Tasks.GatherVcfs as TotallyRadicalGatherVcfs { | ||
input: | ||
input_vcfs = gnarly_gvcfs, | ||
input_vcf_fofn = write_lines(GnarlyGenotyper.output_vcf), | ||
output_vcf_name = callset_name + "." + idx + ".gnarly.vcf.gz", | ||
disk_size = large_disk | ||
} | ||
|
@@ -196,9 +221,10 @@ workflow JointGenotyping { | |
} | ||
} | ||
|
||
#TODO: I suspect having write_lines in the input here is breaking call caching | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does call caching work now on Azure? Does it work the same as in GCP (conceptually?) in terms of how it establishes identity? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does! It works the same as GCP as far as I can tell. The suspicion in the is todo is because the write_lines makes a new temp file each time, but I didn't investigate further. Could definitely be something else that caused call caching to break on this task one time which I happened to notice. Also at the moment call caching only works with dockerhub (rather than Azure Container Registry), but it's mostly been consistent and working for me. |
||
call Tasks.GatherVcfs as SitesOnlyGatherVcf { | ||
input: | ||
input_vcfs = HardFilterAndMakeSitesOnlyVcf.sites_only_vcf, | ||
input_vcf_fofn = write_lines(HardFilterAndMakeSitesOnlyVcf.sites_only_vcf), | ||
output_vcf_name = callset_name + ".sites_only.vcf.gz", | ||
disk_size = medium_disk | ||
} | ||
|
@@ -336,9 +362,10 @@ workflow JointGenotyping { | |
# For small callsets we can gather the VCF shards and then collect metrics on it. | ||
# HUGE disk was failing in Azure... | ||
if (is_small_callset) { | ||
|
||
call Tasks.GatherVcfs as FinalGatherVcf { | ||
input: | ||
input_vcfs = ApplyRecalibration.recalibrated_vcf, | ||
input_vcf_fofn = write_lines(ApplyRecalibration.recalibrated_vcf), | ||
output_vcf_name = callset_name + ".vcf.gz", | ||
disk_size = large_disk | ||
} | ||
|
@@ -369,7 +396,7 @@ workflow JointGenotyping { | |
|
||
# CrossCheckFingerprints takes forever on large callsets. | ||
# We scatter over the input GVCFs to make things faster. | ||
if (scatter_cross_check_fingerprints) { | ||
if (defined(cross_check_fingerprint_scatter_partition)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm really curious about (a) what crosscheck fingerprints is doing for us here (are you comparing to external truth? Do you expect no samples to match? are samples on > 1 lane?) and then also (b) the reasoning for the scaling approach here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I answered in this in slack too, but the check here is to make sure that joint genotyping itself didn't swap samples, so we're checking the input gvcfs to the output vcf. This might be overkill, especially since we need to scatter it to work on a large number of samples. Might be better to spot check some random samples rather than truly check that every single sample wasn't swapped by our pipeline. |
||
call Tasks.GetFingerprintingIntervalIndices { | ||
input: | ||
unpadded_intervals = unpadded_intervals, | ||
|
@@ -384,37 +411,41 @@ workflow JointGenotyping { | |
|
||
call Tasks.GatherVcfs as GatherFingerprintingVcfs { | ||
input: | ||
input_vcfs = vcfs_to_fingerprint, | ||
input_vcf_fofn = write_lines(vcfs_to_fingerprint), | ||
output_vcf_name = callset_name + ".gathered.fingerprinting.vcf.gz", | ||
disk_size = medium_disk | ||
} | ||
|
||
call Tasks.SelectFingerprintSiteVariants { | ||
input: | ||
input_vcf = GatherFingerprintingVcfs.output_vcf, | ||
input_vcf_index = GatherFingerprintingVcfs.output_vcf_index, | ||
base_output_name = callset_name + ".fingerprinting", | ||
haplotype_database = haplotype_database, | ||
disk_size = medium_disk | ||
} | ||
|
||
call Tasks.PartitionSampleNameMap { | ||
input: | ||
sample_name_map = sample_name_map, | ||
line_limit = 1000 | ||
} | ||
|
||
scatter (idx in range(length(PartitionSampleNameMap.partitions))) { | ||
# Get partitions by partition number of gvcfs, including any remainder in the last partition | ||
# Subsetting happens in the CrossCheckFingerprints task | ||
Array[Int] partitions = range((num_gvcfs+cross_check_fingerprint_scatter_partition)/cross_check_fingerprint_scatter_partition) | ||
|
||
Array[File] files_in_partition = read_lines(PartitionSampleNameMap.partitions[idx]) | ||
scatter (idx in range(length(partitions))) { | ||
Int parition_scaled = (partitions[idx] + 1) * cross_check_fingerprint_scatter_partition | ||
|
||
call Tasks.CrossCheckFingerprint as CrossCheckFingerprintsScattered { | ||
input: | ||
gvcf_paths = files_in_partition, | ||
vcf_paths = vcfs_to_fingerprint, | ||
sample_name_map = sample_name_map, | ||
gvcf_paths_fofn = write_lines(gvcf_paths), | ||
gvcf_index_paths_fofn = write_lines(gvcf_path_indexes), | ||
vcf_paths_fofn = write_lines([SelectFingerprintSiteVariants.output_vcf]), | ||
vcf_index_paths_fofn = write_lines([SelectFingerprintSiteVariants.output_vcf_index]), | ||
sample_names_from_map_fofn = write_lines(sample_names_from_map), | ||
partition_index = parition_scaled, | ||
partition_ammount = cross_check_fingerprint_scatter_partition, | ||
gvcf_paths_length = length(gvcf_paths), | ||
haplotype_database = haplotype_database, | ||
output_base_name = callset_name + "." + idx, | ||
scattered = true | ||
scattered = true, | ||
disk = small_disk | ||
} | ||
} | ||
|
||
|
@@ -426,19 +457,19 @@ workflow JointGenotyping { | |
} | ||
} | ||
|
||
if (!scatter_cross_check_fingerprints) { | ||
|
||
scatter (line in sample_name_map_lines) { | ||
File gvcf_paths = line[1] | ||
} | ||
if (!defined(cross_check_fingerprint_scatter_partition)) { | ||
|
||
call Tasks.CrossCheckFingerprint as CrossCheckFingerprintSolo { | ||
input: | ||
gvcf_paths = gvcf_paths, | ||
vcf_paths = ApplyRecalibration.recalibrated_vcf, | ||
sample_name_map = sample_name_map, | ||
gvcf_paths_fofn = write_lines(gvcf_paths), | ||
gvcf_index_paths_fofn = write_lines(gvcf_path_indexes), | ||
vcf_paths_fofn = write_lines(ApplyRecalibration.recalibrated_vcf), | ||
vcf_index_paths_fofn = write_lines(ApplyRecalibration.recalibrated_vcf_index), | ||
sample_names_from_map_fofn = write_lines(sample_names_from_map), | ||
gvcf_paths_length = length(gvcf_paths), | ||
haplotype_database = haplotype_database, | ||
output_base_name = callset_name | ||
output_base_name = callset_name, | ||
disk = small_disk | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a lot of the changes -- moving from tasks taking an Array[File] to a fofn produced by write_lines. I'm guessing this is an Azure-ism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, some of these should no longer be necessary. I cleaned some of this up.
For the ones that remain, I think we needed these to be FOFNs for two reasons: 1) localization_optional isn't implemented yet in Azure so the inputs need to be either FOFNs or Array[String] and more importantly 2) The SAS token environment variable provided by Cromwell is based on where the File input is located. Namely the FOFN File needs to have the same SAS token as the rest of the inputs and by providing the task a FOFN rather than Array[String] we tell Cromwell where to grab the SAS from.