#!/usr/bin/env python # file find_oligos.py # Copyright (C) 2005 Regents of the University of Colorado # Contact: # James Smagala # smagala@colorado.edu # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """Get all _csvd files from a dir tree, find consensus, find oligo pairs This code was written as a quick hack to get results. Note that mfold is used, but is not included here - you'll need to install it yourself. This code has not been tested under DOS, due to its dependence on mfold. Revision History 050510 James Smagala: Rework of get_primers 050518 JS: Find at least one primer pair for every conserved region 050519 JS: Find multiple pairs per region 050522 JS: Include added entropy information 050824 JS: Cleaned up, OptParse added, copyright etc. added """ import os import re from math import log,log10 from optparse import OptionParser class AlignmentError(Exception): pass class FastaFormatError(Exception): pass def parseFasta (infile): """Convert FASTA files into a comment,sequence pair Requires a file-like object as input, in FASTA form: >comment sequence Yields (comment,sequence) tuples """ # Get the first comment line comment = infile.readline() if comment.startswith('>'): comment = comment[1:].strip() else: raise FastaFormatError, "First character of FASTA file must be '>'" seq = [] def compileSeq(seq): # Ensure that the sequence data exists seq = ''.join(seq) if not seq: raise FastaFormatError, "FASTA record contains no sequence data" return seq for line in infile: # Check for the start of a new record if line.startswith('>'): seq = compileSeq(seq) yield comment,seq seq = [] comment = line[1:].strip() continue # Add the line to the sequence data, removing white space seq.append(''.join(line.split())) seq = compileSeq(seq) yield comment,seq def qconsensus(infile): """Find the majority consensus and 'quality' (entropy) of each position""" def getCounts(*chars): DNA = "ACGT" count = dict([(char,0) for char in DNA]) for char in chars: try: count[char] += 1 except KeyError: if char == None: raise AlignmentError, "Sequence are not all the same length" return count def getChar(count): tmp = [(count[char],char) for char in count] tmp.sort() return tmp[-1][1] def calcEntropy(count): total_chars = sum(count.values()) def getContrib(char): fractional_abundance = float(char) / total_chars try: return fractional_abundance * log(fractional_abundance) / log(2) except OverflowError: return 0 return abs(sum([getContrib(char) for char in count.values()])) # parse the fa flat db into a list seqs = [seq for comment,seq in parseFasta(infile)] # count up the character contribution at each position counts = map(getCounts,*seqs) # count the minimum number of sequences that contributed min_exist = min(sum(count.values()) for count in counts) # get a (base,entropy) pair for each position cons = [(getChar(count),calcEntropy(count)) for count in counts] return cons,min_exist,len(seqs) def mfold(seq,salt): """mfold the sequence""" # make a tmp .fas file for mfold outtmp = open('tmp.fas','w') print >>outtmp, '>temp file for mfolding\n%s' % seq outtmp.close() # mfold the sequence junk,mfold_out = os.popen4('mfold-basic SEQ=%s NA=DNA T=25 NA_CONC=%s' % \ ('tmp.fas',salt),'r') # read and return the Tm for line in mfold_out: if 'Tm' in line: line = ''.join(line.split()) mfold_out.close() return float(line.split('=')[-1]) # if no folding was found, set the Tm low to include this sequence mfold_out.close() return 0.0 def gen_oligos(qcons): """Given a consensus and quality scores, generate oligos""" # constants min_len = 16 max_len = 25 min_gc = 30 max_gc = 70 min_tm = 50 max_sstm = 35 seq_len = len(qcons) # % formamide formamide = 30 # [Na+] equivalents salt = 0.730 # increase the entropy by this amount for each loop iteration entropy_increment = 0.001 # loop through the sizes in reverse order. This will find a pair of oligos # with a one base gap between them sizes = range(max_len*2+1,min_len*2,-1) def get_tm(gc): # predict match Tm return 67 + 16.6 * log10(salt / (1.0 + 0.7 * salt)) \ + 0.8 * gc - 500 / seq_len - 0.5 * formamide def get_gc(seq): gc = filter(lambda x: x in 'GC',seq) return float(len(gc)) * 100 / len(seq) # get all possible oligo pairs all_regions = [] for size in sizes: # find the break point between the two oligos gap_index = size/2 for start in range(0,seq_len-size+1): seq = qcons[start:start+size] # get the entropies for all positions except the gap entropy3 = [record[1] for record in seq[:gap_index]] entropy5 = [record[1] for record in seq[gap_index+1:]] entropy3.sort() entropy5.sort() flag = 0 max_entropy = max((entropy3[-1],entropy5[-1])) entropy2 = min((entropy3[-1],entropy5[-1])) if entropy3[-1] >= entropy3[-2] >= entropy5[-1]: flag = 1 max_entropy = entropy3[-1] entropy2 = entropy3[-2] if entropy5[-1] >= entropy5[-2] >= entropy3[-1]: flag = 1 max_entropy = entropy5[-1] entropy2 = entropy5[-2] if abs(entropy2) < 0.001: flag = 0 # record all information about this pair all_regions.append({\ 'start':start+1,\ 'length':size,\ 'seq3':''.join([record[0] for record in seq[:gap_index]]),\ 'seq5':''.join([record[0] for record in seq[gap_index+1:]]),\ 'max_entropy':max_entropy,\ 'entropy2':entropy2,\ 'flag':flag,\ 'sstm3':None,\ 'sstm5':None}) curr = all_regions[-1] curr['gc3'] = get_gc(curr['seq3']) curr['gc5'] = get_gc(curr['seq5']) curr['tm3'] = get_tm(curr['gc3']) curr['tm5'] = get_tm(curr['gc5']) # reduce the number of regions considered #print 'total: %s' % len(all_regions) all_regions = filter(lambda x: min_gc<=x['gc3']<=max_gc and \ min_gc<=x['gc5']<=max_gc,all_regions) #print 'after gc filter: %s' % len(all_regions) all_regions = filter(lambda x: min_tm<=x['tm3'] and \ min_tm<=x['tm5'],all_regions) #print 'after tm filter: %s' % len(all_regions) # calculate secondary structure Tm once only, until we find something # or until we run out of things to calculate max_entropy = 0 accepted_regions = [] while 1: best_regions = filter(lambda x: max_entropy > x['max_entropy'],all_regions) all_regions = filter(lambda x: max_entropy <= x['max_entropy'],all_regions) #print 'entropy=%0.3f: %s' % (max_entropy,len(best_regions)) for region in best_regions: region['sstm3'] = mfold(region['seq3'],salt) if region['sstm3'] > max_sstm: continue region['sstm5'] = mfold(region['seq5'],salt) if region['sstm5'] > max_sstm: continue accepted_regions.append(region) max_entropy += entropy_increment if not all_regions: return accepted_regions def sort_and_pick(records): """Sort and pick oligos by start position, entropy, and length""" # sort by start position (Decorate Sort Undecorate = DSU) output = [] pos_tmp = [(record['start'],record) for record in records] pos_tmp.sort() # filter out each start position and choose the lowest max entropy while pos_tmp: pos = pos_tmp[0][0] ent_tmp = filter(lambda x: x[0] == pos,pos_tmp) pos_tmp = filter(lambda x: x[0] != pos,pos_tmp) ent_tmp = [(record[1]['max_entropy'],record[1]) for record in ent_tmp] ent = ent_tmp[0][0] len_tmp = filter(lambda x: x[0] == ent,ent_tmp) ent_tmp = filter(lambda x: x[0] != ent,ent_tmp) len_tmp = [(record[1]['length'],record[1]) for record in len_tmp] len_tmp.sort() output.append(len_tmp[-1][-1]) return output def parseDirs(params,root,names): """Get each file, make a directory, find the conserved regions, get oligos""" pattern,outfile = params all_records = [] files = [name for name in names if pattern.match(name)] for file in files: # get consensus with entropy per position, total # seqs, # of seqs that # exist (note this is a minimum, so worst case) try: infile = open(os.path.join(root,file),'r') except IOError,msg: print msg exit(1) try: qcons,exist,total = qconsensus(infile) except AlignmentError,msg: print msg infile.close() exit(1) infile.close() # get possible oligos new_records = gen_oligos(qcons) if not new_records: continue # add info to all new oligos name = file.split('.')[0] name = name.split('_') reg_start = int(name[-2]) reg_end = int(name[-1]) name = '_'.join(name[:-3]) for record in new_records: record['name'] = name record['start'] = record['start'] + reg_start - 1 record['end'] = record['start'] + record['length'] - 1 record['total'] = total record['exist'] = exist record['regions'] = len(files) all_records.extend(new_records) # pick an appropiate collection of oligos for output? picked = sort_and_pick(all_records) for output in picked: print >>outfile, '%s,%s,%s,%s,,%0.3f,%0.3f,%s,,%s,%s,,,,%s,%s,,%0.1f,\ %0.1f,%0.1f,,%0.1f,%0.1f,%0.1f,,%s' % \ (output['name'],output['start'],output['end'],output['length'],\ output['max_entropy'],output['entropy2'],output['flag'],\ output['exist'],output['total'],output['seq3'],output['seq5'],\ output['tm3'],output['sstm3'],output['gc3'],output['tm5'],\ output['sstm5'],output['gc5'],output['regions']) print >>outfile, ',' def parseCommandLine(): """Get and check command line options and arguments.""" usage = "usage: %prog STARTDIR OUTFILE" version = "%prog 0.1" description = "Pick potential capture/label pairs from a conserved region" parser = OptionParser(usage=usage,version=version,description=description) options, args = parser.parse_args() try: dir = ''.join(args[0].split('"')) file = ''.join(args[1].split('"')) except IndexError: parser.error("Missing input") if not os.path.isdir(dir): parser.error("%s is not a directory" % dir) return dir,file def main(): # get command line options start_dir,ofile = parseCommandLine() # open the output file try: outfile = open(ofile,'w') except IOError,msg: print msg exit(1) # all files of interest will match this pattern pattern = re.compile(r'.+_csvd_\d+_\d+.fas') # all operations will take place with tmp as current directory os.chdir('/tmp') # add a header to the outfile print >>outfile, "name,start,end,length,,entropy1,entropy2,flag,,\ # seq exist,# seq total,,select,,3' seq, 5' seq,,3' match tm,3' self tm,\ 3' %gc,,5' match tm, 5' self tm,5' %gc,,# cons reg" # walk the directory structure to generate the file os.path.walk(start_dir,parseDirs,(pattern,outfile)) outfile.close() if __name__ == "__main__": main()