Source code for dgenies.bin.filter_contigs

#!/usr/bin/env python3

import os
import re
import shutil
try:
    from dgenies.bin.index import Index
except ImportError:
    from index import Index
from pathlib import Path
from Bio import SeqIO


[docs]class Filter: """ Filter of a fasta file: remove too small contigs """ def __init__(self, fasta, index_file, type_f, min_filtered=0, split=False, out_fasta=None, replace_fa=False): """ :param fasta: fasta file path :type fasta: str :param index_file: index file path :type index_file: str :param type_f: type of sample (query or target) :type type_f: str :param min_filtered: minimum number of large contigs to allow filtering :type min_filtered: int :param split: are contigs split :type split: bool :param out_fasta: output fasta file path :type out_fasta: str :param replace_fa: if True, replace fasta file :type replace_fa: bool """ self.fasta = fasta self.index_file = index_file self.type_f = type_f self.min_filtered = min_filtered self.split = split if out_fasta is not None: self.out_fasta = out_fasta else: self.out_fasta = os.path.join(os.path.dirname(self.fasta), "filtered_" + os.path.basename(self.fasta)) self.replace_fa = replace_fa
[docs] def filter(self): """ Run filter of contigs :return: True if success, else False :rtype: bool """ f_outs = self._check_filter() if len(f_outs) > 0: self._filter_out(f_outs=f_outs) return True return False
def _check_filter(self): """ Load index of fasta file, and determine contigs which must be removed. Remove them only in the index :return: list of contigs which must be removed :rtype: list """ # Load contigs: name, order, contigs, reversed_c, abs_start, c_len = Index.load(index_file=self.index_file, merge_splits=self.split) # Sort contigs: contigs_order = sorted(order, key=lambda x: -contigs[x]) # Find the N90: sum_l = 0 n95_contig = None n95_value = 0.95 * c_len pos = -1 len_small_contigs = 0 len_1_pct = 0.01 * c_len for contig in contigs_order: pos += 1 sum_l += contigs[contig] if contigs[contig] < len_1_pct: len_small_contigs += contigs[contig] if sum_l >= n95_value: n95_contig = contig if self.type_f == "query" and len_small_contigs >= 0.7 * 0.95 * c_len: Path(os.path.join(os.path.dirname(self.fasta), ".do-sort")).touch() # Min length of contigs min_length = 0.05 * contigs[n95_contig] f_outs = [] breakpoint = None for contig in contigs_order[pos:]: if contigs[contig] < min_length: breakpoint = pos break pos += 1 if breakpoint is not None: f_outs = contigs_order[breakpoint:] if len(f_outs) > self.min_filtered: with open(os.path.join(os.path.dirname(self.fasta), ".filter-" + self.type_f), "w") as list_f: list_f.write("\n".join(f_outs) + "\n") kept = contigs_order[:breakpoint] if self.split: f_outs = [] name, contigs_order_split, contigs, reversed_c, abs_start_split, c_len_split = \ Index.load(index_file=self.index_file, merge_splits=False) kept_s = [] for contig in contigs_order_split: match = re.match(r"(.+)_###_\d+", contig) contig_name = contig if match is not None: contig_name = match.group(1) if contig_name in kept: kept_s.append(contig) else: f_outs.append(contig) kept = kept_s else: kept.sort(key=lambda k: order.index(k)) Index.save(index_file=self.index_file, name=name, contigs=contigs, order=kept, reversed_c=reversed_c) else: f_outs = [] return f_outs def _filter_out(self, f_outs): """ Remove too small contigs from Fasta file :param f_outs: contigs which must be filtered out :type f_outs: list """ sequences = SeqIO.parse(open(self.fasta), "fasta") keeped = (record for record in sequences if record.name not in f_outs) with open(self.out_fasta, "w") as out_fa: SeqIO.write(keeped, out_fa, "fasta") if self.replace_fa: os.remove(self.fasta) shutil.move(self.out_fasta, self.fasta)
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Filter too small contigs") parser.add_argument('-f', '--fasta', type=str, required=True, help="Input fasta file") parser.add_argument('-i', '--index', type=str, required=True, help="Index file for the fasta file") parser.add_argument('-t', '--type', type=str, required=True, choices=["query", "target"], help="Type of fasta: query or target") parser.add_argument('-m', '--min-filtered', type=int, required=False, default=0, help="Minimum number of filtered contigs") parser.add_argument('-r', "--replace-fasta", action='store_const', const=True, default=False, help="Replace original fasta file") parser.add_argument('-s', "--split", action='store_const', const=True, default=False, help="Is fasta split") args = parser.parse_args() filter_f = Filter(fasta=args.fasta, index_file=args.index, type_f=args.type, min_filtered=args.min_filtered, split=args.split, replace_fa=args.replace_fasta) filter_f.filter()