Skip to content

Commit

Permalink
Personal tool to correct overlapping bases
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Mar 10, 2022
1 parent 5375b25 commit f6e9f09
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* The MIT License
*
* Copyright (c) 2022 Fulcrum Genomics
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package com.fulcrumgenomics.personal.nhomer

import com.fulcrumgenomics.FgBioDef.{FgBioEnum, PathToBam, FilePath, SafelyClosable, forloop}
import com.fulcrumgenomics.bam.Bams
import com.fulcrumgenomics.bam.api.{SamRecord, SamSource, SamWriter}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.commons.collection.ParIterator
import com.fulcrumgenomics.commons.util.LazyLogging
import com.fulcrumgenomics.commons.util.Threads.IterableThreadLocal
import com.fulcrumgenomics.sopt.{arg, clp}
import com.fulcrumgenomics.umi.SimpleConsensusCaller
import com.fulcrumgenomics.umi.VanillaUmiConsensusCallerOptions.{DefaultErrorRatePostUmi, DefaultErrorRatePreUmi}
import com.fulcrumgenomics.util.NumericTypes.PhredScore
import com.fulcrumgenomics.util.{Io, Metric, ProgressLogger}
import enumeratum.EnumEntry

import scala.collection.immutable


@clp(group = ClpGroups.SamOrBam, description=
"""
|Masks or corrects overlapping bases in FR read pairs.
""")
class CorrectOverlappingBases
( @arg(flag='i', doc="Input SAM or BAM file of aligned reads in coordinate order.") val input: PathToBam,
@arg(flag='o', doc="Output SAM or BAM file.") val output: PathToBam,
@arg(flag='m', doc="Output metrics file.") val metrics: FilePath,
@arg(flag='g', doc="Maximum error rate of a gapless alignment before switching to a gapped alignment") val maxGaplessErrorRate: Double = 1.0,
@arg(flag='1', doc="The Phred-scaled error rate for an error prior to the UMIs being integrated.") val errorRatePreUmi: PhredScore = DefaultErrorRatePreUmi,
@arg(flag='2', doc="The Phred-scaled error rate for an error post the UMIs have been integrated.") val errorRatePostUmi: PhredScore = DefaultErrorRatePostUmi,
@arg(doc="Mask all corrected bases") val mask: Boolean = false,
@arg(doc="The number of threads to use while consensus calling.") val threads: Int = 1
) extends FgBioTool with LazyLogging {
Io.assertReadable(input)
Io.assertCanWriteFile(output)

//private val matchScore: Int = 1
//private val aligner: Aligner = Aligner(matchScore, -4, -6, -1, Mode.Global)

private case class ThreadData(caller: SimpleConsensusCaller, templateMetric: CorrectOverlappingBasesMetric, basesMetric: CorrectOverlappingBasesMetric)
private object ThreadData {
def apply(): ThreadData = ThreadData(
caller = new SimpleConsensusCaller(
errorRatePreLabeling = errorRatePreUmi,
errorRatePostLabeling = errorRatePostUmi
),
templateMetric = CorrectOverlappingBasesMetric(tpe=CountType.Templates),
basesMetric = CorrectOverlappingBasesMetric(tpe=CountType.Bases)
)
}

override def execute(): Unit = {
val source = SamSource(input)
val writer = SamWriter(output, source.header)
val progress = new ProgressLogger(logger)
val templateIterator = Bams.templateIterator(source)
val threadData = new IterableThreadLocal(() => ThreadData())

ParIterator(templateIterator, threads=threads)
.map { template =>
val threadDatum = threadData.get()
threadDatum.synchronized {
// update metrics
threadDatum.templateMetric.total += 1
threadDatum.basesMetric.total += template.primaryReads.map(_.length).sum
// corrects
(template.r1, template.r2) match {
case (Some(r1), Some(r2)) if r1.mapped && r2.mapped && r1.matesOverlap.contains(true) && r1.isFrPair =>
if (r1.positiveStrand) correct(r1, r2, threadDatum) else correct(r2, r1, threadDatum)
case _ => ()
}
}
template.allReads.foreach(progress.record)
template
}.toAsync(ParIterator.DefaultChunkSize * 8).foreach { template =>
writer ++= template.allReads
}
progress.logLast()
source.safelyClose()
writer.close()

val templatesMetric = CorrectOverlappingBasesMetric(tpe=CountType.Templates)
val basesMetric = CorrectOverlappingBasesMetric(tpe=CountType.Bases)
threadData.foreach { datum =>
templatesMetric += datum.templateMetric
basesMetric += datum.basesMetric
}

Metric.write(metrics, templatesMetric, basesMetric)
}

private def positiveOrElse(readPos: Int, default: Int): Int = if (readPos <= 0) default else readPos

private def correct(r1: SamRecord, r2: SamRecord,
data: ThreadData): Unit = {
val r1OldBases = r1.basesString
val r2OldBases = r2.basesString

// TODO: adjust based on leading/trailing soft-clipping that may make lengths differ between extracted sequence
// Get the read sequences that overlap in R1 and R2 respectively, 0-based exclusive
val r1From = Math.max(0, r1.readPosAtRefPos(r2.start, returnLastBaseIfDeleted=false).getOrElse(0) - 1)
val r1Until = positiveOrElse(r1.readPosAtRefPos(r2.end, returnLastBaseIfDeleted=false).getOrElse(0), r1.length)
val r2From = Math.max(0, r2.readPosAtRefPos(r1.start, returnLastBaseIfDeleted=false).getOrElse(0) - 1)
val r2Until = positiveOrElse(r2.readPosAtRefPos(r1.end, returnLastBaseIfDeleted=false).getOrElse(0), r2.length)
val r1Sub = r1OldBases.slice(from=r1From, until=r1Until)
val r2Sub = r2OldBases.slice(from=r2From, until=r2Until)

data.templateMetric.overlapping += 1
data.basesMetric.overlapping += r1.length + r2.length

// For now, ignore indels, but it would be hard to handle them.
require(r1Sub.nonEmpty || r2Sub.nonEmpty)
if (r1Sub.length != r2Sub.length) {
data.templateMetric.mismatch_length += 1
data.basesMetric.mismatch_length += r1Sub.length + r2Sub.length
} else if (this.maxGaplessErrorRate < errorRate(r1Sub, r2Sub)) {
data.templateMetric.error_rate_too_high += 1
data.basesMetric.error_rate_too_high += r1Sub.length + r2Sub.length
} else {
val r1OldQuals = r1.qualsString
val r2OldQuals = r2.qualsString

// get the consensus
val result = data.caller.callBasesAndQualities(
sequences = Seq(r1Sub, r2Sub),
qualities = Seq(r1.quals.slice(from=r1From, until=r1Until), r2.quals.slice(from=r2From, until=r2Until))
)
val (bases, quals) = result match {
case (_, None) => throw new IllegalStateException("Bug: passed in two sequences, should have qualities returned")
case (b, Some(q)) => (b, q)
}

// replace the bases and qualities
r1.bases = r1OldBases.take(r1From) + bases + r1OldBases.drop(r1Until)
r1.quals = r1OldQuals.take(r1From) + quals + r1OldQuals.drop(r1Until)
r2.bases = r2OldBases.take(r2From) + bases + r2OldBases.drop(r2Until)
r2.quals = r2OldQuals.take(r2From) + quals + r2OldQuals.drop(r2Until)

val maybeMaskedBases: String = if (!mask) bases else {
val builder = new StringBuilder(r1Sub.length)
forloop(from = 0, until = r1Sub.length) { i =>
if (bases(i) != r1Sub(i) || bases(i) != r2Sub(i)) builder.append('N')
else builder.append(bases(i))
}
builder.toString
}

// update metrics
var basesCorrected = 0
var basesMasked = 0
forloop(from = 0, until = r1Sub.length) { i =>
if (maybeMaskedBases(i) == 'N') basesMasked += 2
else {
if (maybeMaskedBases(i) != r1Sub(i)) basesCorrected += 1
if (maybeMaskedBases(i) != r2Sub(i)) basesCorrected += 1
}
}
if (basesMasked > 0) {
data.templateMetric.masked += 1
data.basesMetric.masked += basesMasked
}
if (basesCorrected > 0) {
data.templateMetric.corrected += 1
data.basesMetric.corrected += basesCorrected
}
data.templateMetric.examined += 1
data.basesMetric.examined += r1Sub.length + r2Sub.length
}
}

private def errorRate(query: String, target: String): Double = {
require(query.length == target.length)
require(query.nonEmpty)
var errors = 0
forloop (from = 0, until = query.length) { i =>
if (query(i) != target(i)) errors += 1
}
errors / query.length
}
}

case class CorrectOverlappingBasesMetric
(
tpe: CountType,
var total: Long = 0,
var overlapping: Long = 0,
var mismatch_length: Long = 0,
var error_rate_too_high: Long = 0,
var examined: Long = 0,
var masked: Long = 0,
var corrected: Long = 0,
) extends Metric {
def +=(other: CorrectOverlappingBasesMetric): CorrectOverlappingBasesMetric = {
require(this.tpe == other.tpe)
this.total += other.total
this.overlapping += other.overlapping
this.mismatch_length += other.mismatch_length
this.error_rate_too_high += other.error_rate_too_high
this.examined += other.examined
this.masked += other.masked
this.corrected += other.corrected
this
}
}

sealed trait CountType extends EnumEntry

object CountType extends FgBioEnum[CountType] {
case object Templates extends CountType
case object Bases extends CountType

override def values: immutable.IndexedSeq[CountType] = findValues
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ class CollectDuplexSeqMetrics
ba.iterator.map(r => r[String](this.umiTag).split('-')).foreach { case Array(u1, u2) => umi1s += u2; umi2s += u1 }

val Seq(abConsensusUmi, baConsensusUmi) = Seq(umi1s, umi2s).map(_.result()).map{ umis =>
val consensus = this.consensusBuilder.callConsensus(umis)
val consensus = this.consensusBuilder.callBases(umis)
val metric = this.umiMetricsMap.getOrElseUpdate(consensus, UmiMetric(umi=consensus))
metric.raw_observations += umis.size
metric.unique_observations += 1
Expand Down
65 changes: 45 additions & 20 deletions src/main/scala/com/fulcrumgenomics/umi/SimpleConsensusCaller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ import scala.collection.immutable.BitSet
* @param errorRatePostLabeling The error probability post to adding the UMI. By default, set to almost surely zero
* @param qError The adjusted error probability. By default set to Q20.
*/
private[umi] class SimpleConsensusCaller(val errorRatePreLabeling: Byte = 90.toByte,
val errorRatePostLabeling: Byte = 90.toByte,
qError: PhredScore = 20.toByte
) {
private[fulcrumgenomics]
class SimpleConsensusCaller(val errorRatePreLabeling: Byte = 90.toByte,
val errorRatePostLabeling: Byte = 90.toByte,
qError: PhredScore = 20.toByte
) {
val pError: LogProbability = LogProbability.fromPhredScore(this.qError)
val pTruth: LogProbability = LogProbability.not(this.pError)

Expand All @@ -55,37 +56,61 @@ private[umi] class SimpleConsensusCaller(val errorRatePreLabeling: Byte = 90.toB
private val DnaBases = Set('A', 'C', 'G', 'T', 'N', 'a', 'c', 'g', 't', 'n')
private val DnaBasesBitSet = BitSet(DnaBases.map(_.toInt).toSeq:_*)

/** Calls a simple consensus sequences from a set of sequences all the same length. */
def callConsensus(sequences: Seq[String]): String = {
/** Calls a simple consensus sequence from a set of sequences all the same length. */
def callBases(bases: Seq[String]): String = callBasesAndQualities(bases)._1

/** Calls a simple consensus sequence with base qualities from a set of sequences all the same length. */
def callBasesAndQualities(sequences: Seq[String], qualities: Seq[Array[PhredScore]] = Seq.empty): (String, Option[String]) = {
require(sequences.nonEmpty, "Can't call consensus on an empty set of sequences!")
require(qualities.isEmpty || sequences.length == qualities.length)

if (sequences.length == 1) sequences.head else {
if (sequences.length == 1) {
(sequences.head, None)
}
else {
require(sequences.forall(_.length == sequences.head.length), "Sequences must all have the same length")
val buffer = new StringBuilder
val firstRead = sequences.head
val readLength = firstRead.length
val consensusBases = new StringBuilder
val consensusQuals = new StringBuilder
val firstRead = sequences.head
val readLength = firstRead.length
val sequencesLength = sequences.length

forloop (from=0, until=readLength) { i =>
forloop (from=0, until=readLength) { baseIndex =>
this.consensusBuilder.reset()
var nonDna = 0
sequences.foreach { sequence =>
val char = sequence.charAt(i)
forloop (from=0, until=sequences.length) { sequenceIndex =>
val sequence = sequences(sequenceIndex)
val char = sequence.charAt(baseIndex)
if (!this.DnaBasesBitSet.contains(char.toInt)) {
nonDna += 1
// verify that all non-DNA bases are the same character
require(firstRead.charAt(i) == char,
s"Sequences must have character '${firstRead.charAt(i)}' at position $i, found '$char'")
require(firstRead.charAt(baseIndex) == char,
s"Sequences must have character '${firstRead.charAt(baseIndex)}' at position $baseIndex, found '$char'")
}
else {
val qual = if (qualities.isEmpty) qError else qualities(sequenceIndex)(baseIndex)
this.consensusBuilder.add(char.toByte, qual)
}
}

val (base: Char, qual: Char) = {
if (nonDna == 0) {
val (_base, _qual) = this.consensusBuilder.call()
(_base.toChar, PhredScore.toFastq(_qual).toChar)
}
else if (nonDna == sequencesLength) {
(firstRead.charAt(baseIndex), PhredScore.toFastq(PhredScore.MinValue).toChar)
} // NB: we have previously verified they are all the same character
else {
throw new IllegalStateException(s"Sequences contained a mix of DNA and non-DNA characters at offset $baseIndex: $sequences")
}
else this.consensusBuilder.add(char.toByte, qError)
}

if (nonDna == 0) buffer.append(this.consensusBuilder.call()._1.toChar)
else if (nonDna == sequencesLength) buffer.append(firstRead.charAt(i)) // NB: we have previously verified they are all the same character
else throw new IllegalStateException(s"Sequences contained a mix of DNA and non-DNA characters at offset $i: $sequences")
consensusBases.append(base)
consensusQuals.append(qual)
}

buffer.toString()
(consensusBases.toString(), Some(consensusQuals.toString()))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ trait UmiConsensusCaller[ConsensusRead <: SimpleRead] {
rec.quals = read.quals
rec(SAMTag.RG.name()) = readGroupId
rec(ConsensusTags.MolecularId) = read.id
if (umis.nonEmpty) rec(ConsensusTags.UmiBases) = this.consensusBuilder.callConsensus(umis)
if (umis.nonEmpty) rec(ConsensusTags.UmiBases) = this.consensusBuilder.callBases(umis)

rec
}
Expand Down
Loading

0 comments on commit f6e9f09

Please sign in to comment.