#!/usr/bin/env python3
import sys
import os
import re
from xopen import xopen
import io
from collections import OrderedDict
[docs]class Splitter:
"""
Split large contigs in smaller ones
"""
def __init__(self, input_f, name_f, output_f, size_c=10000000, query_index="query_split.idx", debug=False):
"""
:param input_f: input fasta file path
:type input_f: str
:param name_f: sample name
:type name_f: str
:param output_f: output fasta file path
:type output_f: str
:param size_c: size of split contigs
:type size_c: int
:param query_index: index file path for query
:type query_index: str
:param debug: True to enable debug mode
:type debug: bool
"""
self.input_f = input_f
self.name_f = name_f
self.size_c = size_c
self.output_f = output_f
self.input_gz = input_f.endswith(".gz")
self.output_gz = output_f.endswith(".gz")
self.out_dir = os.path.dirname(output_f)
self.index_file = os.path.join(self.out_dir, query_index)
self.nb_contigs = 0
self.debug = debug
[docs] def split(self):
"""
Split contigs in smaller ones staff
:return: True if the input Fasta is correct, else False
"""
has_header = False
next_header = False # True if next line must be a header line
with (xopen(self.input_f, mode="r") if self.input_gz else open(self.input_f, mode="r")) as fasta, \
(xopen(self.output_f, mode="w") if self.output_gz else open(self.output_f, mode="w")) as enc, \
open(self.index_file, mode="w") as index_f:
index_f.write(self.name_f + "\n")
chr_name = None
fasta_str = ""
nb_line = 0
for line in fasta:
nb_line += 1
line = line.strip("\n")
if re.match(r"^>.+", line) is not None:
has_header = True
next_header = False
if chr_name is not None and len(fasta_str) > 0:
self.nb_contigs += 1
self.flush_contig(fasta_str, chr_name, self.size_c, enc, index_f)
elif chr_name is not None:
return False, "Error: contig is empty: %s" % chr_name
chr_name = re.split("\s", line[1:])[0]
fasta_str = ""
if self.debug:
print("Parsing contig \"%s\"... " % chr_name, end="")
elif len(line) > 0:
if next_header or re.match(r"^[ATGCKMRYSWBVHDXN.\-]+$", line.upper()) is None:
if next_header:
return False, "Error: new header line expected at line %d" % nb_line
return False, "Error: invalid sequence at line %d" % nb_line
fasta_str += line
elif len(line) == 0:
next_header = True
self.nb_contigs += 1
self.flush_contig(fasta_str, chr_name, self.size_c, enc, index_f)
return has_header, ""
[docs] @staticmethod
def write_contig(name, fasta, o_file):
o_file.write(">%s\n" % name)
f_len = len(fasta)
i = 0
while i<f_len:
j = min (i+60, f_len)
o_file.write(fasta[i:j] + "\n")
i = j
o_file.write("\n")
[docs] @staticmethod
def split_contig(name, sequence, block_sizes):
seq_len = len(sequence)
contigs = OrderedDict()
if seq_len < block_sizes + block_sizes * 0.1: # Only one block size
contigs[name] = sequence
return contigs
i = 0
n = 1
while i < seq_len:
j = i + block_sizes
if (j > seq_len) or ((i + (block_sizes * 0.1)) >= seq_len):
j = seq_len
contigs[name + ("_###_%d" % n)] = sequence[i:j]
i = j
n += 1
return contigs
[docs] def flush_contig(self, fasta_str, chr_name, size_c, enc, index_f):
if len(fasta_str) > 0 and chr_name is not None:
contigs = self.split_contig(chr_name, fasta_str, size_c)
for name, seq in contigs.items():
self.write_contig(name, seq, enc)
index_f.write("%s\t%d\n" % (name, len(seq)))
nb_contigs = len(contigs)
if self.debug:
if nb_contigs == 1:
print("Keeped!")
else:
print("Splited in %d contigs!" % nb_contigs)
[docs]def parse_args():
import argparse
parser = argparse.ArgumentParser(description="Split huge contigs")
parser.add_argument('-i', '--input', type=str, required=True, help="Input fasta file")
parser.add_argument('-n', '--name', type=str, required=True, help="Input fasta name")
parser.add_argument('-s', '--size', type=int, required=False, default=10, help="Max size of contigs (Mb)")
parser.add_argument('-o', '--output', type=str, required=True, help="Output fasta file")
args = parser.parse_args()
if args.size < 0:
parser.error("Max size of contigs must be positive")
return args
def __main__():
args = parse_args()
splitter = Splitter(args.input, args.name, args.output, args.size * 1000000)
return not splitter.split()
if __name__ == '__main__':
sys.exit(__main__())