We create an audiobook generator with personal translation
Hello, Habr! While studying a fourth language, I once again decided to try to train my biological neural network on books with parallel translation, but after a couple of evenings I abandoned them again. This approach, when each sentence is accompanied by a translation, seems som
Editor's Context
This article is an English adaptation with additional editorial framing for an international audience.
- Terminology and structure were localized for clarity.
- Examples were rewritten for practical readability.
- Technical claims were preserved with source attribution.
Source: original publication

Hello, Habr!
While studying a fourth language, I once again decided to try to train my biological neural network on books with parallel translation, but after a couple of evenings I abandoned them again. This approach, when each sentence is accompanied by a translation, seems somewhat redundant and interferes with immersion, and if in the text version you can at least glance over the translation, then for other formats, for example, for my favorite audiobooks, this approach will not work in principle.
The most popular version of “educational translation”, which you also used, is interactive, in which the user follows the text in the original language, and himself reveals translations and explanations of forgotten or new words. Is it possible to combine these approaches, taking the advantages of each, and transfer them into an audiobook format? This is what we will do today.
To implement such a partial translation, we could focus on the “complexity” of words, offering for translation only those that are potentially unfamiliar to listeners at, say, A2 level. Then they could play with the threshold of the notorious complexity, choosing who is easier, who is more difficult... But this is all based on the average for the hospital, which in reality will not satisfy anyone.
It would be much more interesting if we could focus on the vocabulary of each specific user, and offer for translation only new, or words that have not been encountered for a long time, using the interval method. And we can do this if we close the entire loop around one user: we process the text based on the user’s personal dictionary, from which we generate an audiobook with a personalized translation, with the words of this generation we expand the dictionary for the following texts, goto 0.
As a first approximation, for this task we have to implement parts of text pre-processing, translation, speech synthesis, and put it all together, adding a little logic, which we talked about in the paragraph above. With existing libraries, this is an adventure of 3 imports and 20 minutes, isn't it? Not really. There will be a lot of code to cover all the important aspects. You can familiarize yourself with the full implementation and try it out. on github, with the results at the end of the material.
It is worth noting in advance that the author is somewhat superficially immersed in the field of NLP, therefore all comments, tips and suggestions are warmly welcomed, both on the general principle of operation and on individual parts of the implementation.
❯ Translation
Well, let's get started. The first logical step, oddly enough, will be translation, since we must approach the processing step with ready-made sentences in two languages.
Here we can leave the choice to the user by implementing various translation options. Among the existing translation tools, they have proven themselves better than others ArgosTranslate And Google Cloud. The first one, although the result smacks of wasted translations, is perhaps the best among free local tools. GC has a limit of 500,000 characters per month on the free plan, but the quality is an order of magnitude higher, although it is still possible to grow to the DeepL level.
The best option would be to use a literary, professional translation. The user is expected to provide it along with the original text. Not as a parallel text, otherwise why are we here, but simply a separate translation. This brings us to the task of sentence alignment, that is, matching which sentences in the original match which sentences in the translation. Of course, iterate through
zip(оригинал, перевод) just a little is not enough. Several sentences in the original language can be translated into one, or vice versa. This is especially true in literary translation.Among the tested alignment tools with pre-trained models, it performed well SentenceTransformers with model
paraphrase-multilingual-MiniLM-L12-v2. Having obtained with its help the similarities between the sentences of the original and the translation, we compare them with each other, turning only to neighboring sentences.def get_literary_translation_from_file(self, original_sentences: list[str]) -> tuple[list[str], list[str]]: with open(config._root_dir / 'input_translation.txt', 'r', encoding='utf-8') as file: text = file.read() translated_sentences = self.text_processor.get_sentences(text) sentences = original_sentences + translated_sentences embeddings = self.model.encode(sentences) # self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') similarities = self.model.similarity(embeddings, embeddings) result_idxs = [] nh, nf = len(original_sentences), len(sentences) src_idx, trg_idx = 0, nh while src_idx < nh and trg_idx < nf: next_src_score = similarities[src_idx + 1][trg_idx] if src_idx + 1 < nh else 0 next_trg_score = similarities[src_idx][trg_idx + 1] if trg_idx + 1 < nf else 0 next_both_score = similarities[src_idx + 1][trg_idx + 1] if src_idx + 1 < nh and trg_idx + 1 < nf else 0 result_idxs.append((src_idx, trg_idx - nh)) if next_src_score > next_both_score: src_idx += 1 elif next_trg_score > next_both_score: trg_idx += 1 else: src_idx += 1 trg_idx += 1 result = [] prev_src, prev_trg = -1, -1 for src, trg in result_idxs: if src == prev_src: result[-1][1] += f' {translated_sentences[trg]}' elif trg == prev_trg: result[-1][0] += f' {original_sentences[src]}' else: result.append([original_sentences[src], translated_sentences[trg]]) prev_src, prev_trg = src, trg return zip(*result)
After accessing the model, we receive a matrix of correspondences of all sentences, not only between languages, but also within them. Thus, the correspondence estimates we need will be located approximately in the second quarter, through which we will “snake”, collecting neighboring sentences.
Since we are only considering the elements
[i+1, j], [i, j+1], [i+1, j+1], cases where the order of sentences radically changes in translation can lead to erroneous alignments, but ignoring such rare cases will allow us to avoid regular false-positive comparisons from completely different parts of the text.Returning to a more general task, it is also worth taking care of transferring the translation from session to session, which is especially important when calling a translator with a limit system, and it will simply speed up the generation for repeated calls. In general, our class with these methods looks like this:
class Translator: def __init__(self, container) -> None: self.text_processor = container.text_processor match config.translation_provider: case 'GoogleCloud': provider = GCTranslateProvider case 'Argos': provider = ArgosTranslateProvider self.provider = provider(config.source_lang, config.target_lang) if config.use_translation_file == 1: self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') @cache def translate(self, text: str) -> str: return self.provider.translate(text) @staticmethod def _get_stable_hash(text: str) -> str: return hashlib.sha256(text.encode('utf-8')).hexdigest() def process_sentences(self, original_sentences: list[str]) -> list[str]: match config.use_translation_file: case 0: self.translated = { self._get_stable_hash(src): self.translate(src.rstrip()) for src in original_sentences } case 1: result_src, result_trg = self.get_literary_translation_from_file(original_sentences) self.translated = {self._get_stable_hash(src): trg for src, trg in zip(result_src, result_trg)} return result_src case 2: with open(config._root_dir / 'temp' / 'translation.pkl', 'rb') as file: self.translated = pickle.load(file) return original_sentences def save_translation_to_file(self) -> None: with open(config._root_dir / 'temp' / 'translation.pkl', 'wb') as file: pickle.dump(self.translated, file) def get_translated_sentence(self, sentence: str) -> str: return self.translated[self._get_stable_hash(sentence)] def get_literary_translation_from_file(self, original_sentences: list[str]) -> tuple[list[str], list[str]]: …
Classes
GCTranslateProvider And ArgosTranslateProvider just composites with an implemented translation method, nothing remarkable.IN
process_sentences() We select the processing path based on the value from the configuration file - direct translation, reuse of a previously saved translation, or alignment with the literary one. We perform translation for all sentences at once, storing them using a stable hash. In this form we write them to the file system, or load previously saved ones.
From the main method we also return a list of original sentences, since they may have changed somewhat in the case of alignment with the literary translation.
❯ Preparing tokens and embeddings
Now we can move on to processing. For basic operations we take spaCy, we will separately load models for each of the languages (for the example of English and Russian languages, we will select
en_core_web_sm And ru_core_news_sm). The choice of models, languages and a whole bunch of other things is left to the user’s choice through a separate config.Looking ahead a little, in addition to the tokens themselves, which we will receive from spacy, we will also need them embeddings. With this spacy won't help us, so let's turn to transformers with model
bert-base-multilingual-cased. Since tokenization bert subword, and we want to work with whole words, we need, after receiving the embeddings, to put our subwords back into whole words, collecting the embedding values from each component. We will add the resulting embeddings to the tokens spacy, for which we need to compare tokens received from different tokenizers:def add_tokens_embeddings(self, sentence: str, spacy_tokens: list[Token]) -> None: spacy_idx = {j: i for i, t in enumerate(spacy_tokens) for j in range(t.idx, t.idx + len(t.text))} tokenized_input = self.embedding_tokenizer( sentence, return_tensors='pt', padding=True, truncation=True, return_offsets_mapping=True ) offsets = tokenized_input.pop('offset_mapping')[0] with torch.no_grad(): model_output = self.embedding_model(**tokenized_input, output_hidden_states=True) embeddings = torch.mean(torch.stack(model_output.hidden_states[-4:]), dim=0).squeeze(0) bert_tokens = self.embedding_tokenizer.convert_ids_to_tokens(tokenized_input.input_ids[0]) aggregators = {…} aggregator = aggregators[config.embedding_aggregator.lower()](embeddings, len(bert_tokens)) subword_count = 0 current_idx = 0 for i, token in enumerate(bert_tokens): if offsets[i][0] == offsets[i][1]: continue start, end = spacy_idx[offsets[i][0].item()], spacy_idx[offsets[i][1].item() - 1] if start != end: raise RuntimeError(f'Intersect error. Bert "{token}", {offsets[i]} (spacy: {start}, {end})') if start == current_idx: aggregator.append(i) subword_count += 1 continue if subword_count: spacy_tokens[current_idx]._.embedding = aggregator.get_final_embedding(subword_count) aggregator.start_new_word(i) subword_count = 1 current_idx = start if subword_count: spacy_tokens[current_idx]._.embedding = aggregator.get_final_embedding(subword_count)
We get embeddings, go through all the tokens, collecting subwords together by token positions spacy, we assign a new attribute to the tokens. General view of the aggregation mechanism:
class BaseAggregator: def __init__(self, embeddings, n): self.embeddings = embeddings self.n = n self.current_embeddings = [] def start_new_word(self, idx): self.current_embeddings = [self.embeddings[idx]] def append(self, idx): self.current_embeddings.append(self.embeddings[idx]) def get_final_embedding(self, subword_count): return torch.mean(torch.stack(self.current_embeddings), dim=0)
Some attributes are not yet used, but will be useful later. Here we have the usual averaging, but this, of course, is not the only approach. We can also apply min/max pooling approaches, selecting only the minimum or maximum values from subwords, we can add weights for tokens through an attention mechanism with L2 normalization, or preprocessing tokens through centering.
Unfortunately, after comparing the results, it turned out that all of the above is not very strikingly different from simple averaging, which generally gives a better conclusion.
Nevertheless, we will consolidate the listed approaches in implementation for further testing in practice:
class MaxPooling(BaseAggregator): def get_final_embedding(self, _): return torch.max(torch.stack(self.current_embeddings), dim=0).values class MinPooling(BaseAggregator): def get_final_embedding(self, _): return torch.min(torch.stack(self.current_embeddings), dim=0).values class Attention(BaseAggregator): def __init__(self, embeddings, n): super().__init__(embeddings, n) self.attention_scores = torch.zeros(self.n, dtype=torch.float32) def start_new_word(self, idx): super().start_new_word(idx) self.attention_scores = torch.zeros(self.n, dtype=torch.float32) self.attention_scores[0] = torch.norm(self.embeddings[idx], p=2) def append(self, idx): super().append(idx) attention_score = torch.matmul(self.embeddings[idx], torch.mean(torch.stack(self.current_embeddings), dim=0)) l2_norm = torch.norm(self.embeddings[idx], p=2) self.attention_scores[len(self.current_embeddings) - 1] = attention_score * l2_norm def get_final_embedding(self, subword_count): if torch.sum(self.attention_scores) != 0: attention_weights = torch.nn.functional.softmax(self.attention_scores[:subword_count], dim=0) return torch.sum(attention_weights.unsqueeze(1) * torch.stack(self.current_embeddings), dim=0) return super().get_final_embedding(subword_count) class TextProcessing: def add_tokens_embeddings(…): … embeddings = torch.mean(torch.stack(model_output.hidden_states[-4:]), dim=0).squeeze(0) bert_tokens = self.embedding_tokenizer.convert_ids_to_tokens(tokenized_input.input_ids[0]) if config.embedding_preprocessing_center: mean_embedding = torch.mean(embeddings, dim=0) embeddings = embeddings - mean_embedding aggregators = { 'averaging': Averaging, 'maxpooling': MaxPooling, 'minpooling': MinPooling, 'attention': Attention, } aggregator = aggregators[config.embedding_aggregator.lower()](embeddings, len(bert_tokens)) …
There is one more important nuance - at least the tokenizer used in spacy is not subword, it still breaks some complex words into several tokens, and, to further complicate matters, its division differs from bert. In English, this will immediately produce an error on all words with apostrophes, spelling variant cannot, and some other cases. Additionally, for our purposes, it would be nice to combine sequences of punctuation tokens and hyphenated words that we want to treat as a single entity. For all these purposes, we should intervene a little in the tokenization process spacy:
def _merge_tokens(doc: Doc) -> list[Token]: spans_to_merge = [] i = 0 n = len(doc) while i < n: token = doc[i] cur_text = token.text.lower() prev_text = doc[i - 1].text.lower() if i > 0 else '' next_text = doc[i + 1].text.lower() if i < n - 1 else '' if ({'\'', '’'} & {cur_text[0], cur_text[-1]}) and i > 0: spans_to_merge.append(doc[i - 1 : i + 1]) elif i < n - 1 and ( (cur_text == 'can' and next_text == 'not') or (token.is_punct and doc[i + 1].is_punct) or (re.match(r'^\p{Sc}$', cur_text) and re.match(r'^\d{1,3}([., ]\d{2,3})*$', next_text)) or (re.match(r'^\p{L}+$', cur_text) and re.match(r'^n[\'’]?t$', next_text)) ): spans_to_merge.append(doc[i : i + 2]) elif 0 < i < n - 1 and re.match(r'^\p{L}-\p{L}$', f'{prev_text[-1]}{cur_text}{next_text[0]}'): start = i - 1 end = i + 2 while end < n - 1 and re.match(r'^-\p{L}$', f'{doc[end].text}{doc[end + 1][0]}'): end += 2 spans_to_merge.append(doc[start:end]) logging.debug(f'Adding span to merge (complex hyphenated chain): {doc[start:end]}') i = end - 1 i += 1 filtered_spans = spacy.util.filter_spans(spans_to_merge) with doc.retokenize() as retokenizer: for span in filtered_spans: retokenizer.merge(span) return [token for token in doc if not token.is_space]
Here and further, it is used
regex as re, thanks to which we can access Unicode categories, which are not supported by native re standard library.Let's pay attention to cases when words spelled with a hyphen form entire sequences, like out-of-the-way. We collect them completely, considering all subsequent matching tokens, starting from the one that responded to the initial check.
❯ Storage Structures
Now that we're almost ready to move on to the basic logic, it's time to describe the structures in which we will store our words. There are two types of them - lemmas and specific word forms. This gives us greater flexibility in translation and reduces repetition: when we see a word for the first time, we translate it, but if the word form has already been encountered (and long enough that we want to add it to the translation), we also look at the last time we encountered the parent lemma, and from this we can discard the given word from the translation if we see that the adjacent form was recently translated. Of course, repetition intervals for lemmas and word forms should be different. The basic representations for each type are the same:
class BaseNode: def __init__(self, intervals: tuple[int]) -> None: self.intervals = intervals self.last_pos = 0 self.level = 0 def check_repeat(self, position: int) -> bool: if not self.level: return True return self.intervals[self.level] < position - self.last_pos def update(self, new_pos: int) -> None: self.level += 1 self.last_pos = new_pos
We store for our words and lemmas the last position in which we encountered them, the number of occurrences, and a link to the list of threshold intervals between repetitions itself.
Intervals, like many other things, are configured through the configuration file. For now they don’t have much significance and test look like:lemma_intervals = tuple(b - a for a, b in pairwise(int(1.05**i) for i in range(190, 600))) entity_intervals = tuple(b - a for a, b in pairwise(int(1.1**i) for i in range(190, 600)))
Since we store the last position and “level” of reinforcement, and not just the distance to the next repetition, we can change the intervals at any time, and all our repetition distances will correspond to the new realities, without any additional steps to reassign them.
In child from
BaseNode in the class for word forms, we only need to set the appropriate intervals and add a field in which we will store the translation:class Entity(BaseNode): def __init__(self, translation: str) -> None: super().__init__(config.entity_intervals) self.translation = translation
Our lemmas will act as the first storage keys, and here we will remember about homonymy and polysemy, due to which, when stored according to one lemma, different entities can be written along the same path. The most optimal solution to this problem in terms of complexity/quality seems to be the use of a combined lemma as a key -
f'{лемма слова оригинала}-{лемма слова перевода}', with sheets in the same form, only from specific forms:class LemmaDict(BaseNode): def __init__(self) -> None: super().__init__(config.lemma_intervals) self.children: dict[str, LemmaDict | Entity] = {} def add(self, lemma_src: str, text_src: str, lemma_trg: str, text_trg: str) -> tuple['LemmaDict', Entity]: lemma_key = f'{lemma_src}-{lemma_trg}' if lemma_key not in self.children: self.children[lemma_key] = LemmaDict() lemma_obj = self.children[lemma_key] ent_key = f'{text_src}-{text_trg}' if ent_key not in lemma_obj.children: lemma_obj.children[ent_key] = Entity(text_trg) return lemma_obj, lemma_obj.children[ent_key]
Here we directly inherit from
BaseNode, and return from add() not only the leaf, but also its “parent” lemma, which will allow us to further check the distance for each of them.Let's look a little ahead again: in the text, we will constantly come across set expressions, named entities and other sequences of words that we do not want to translate word by word. Of course, a story about the music of caveman Nick and a wish to break a leg will add comedy to the output text, but this is not quite what we need. We can save our entities in
LemmaDict for the whole sequence, in the form of one line, but it would still be more logical to save the entire chain, which, in the form of separate lemmas, will also be resistant to small changes in form. The best way to implement this would be a variation of the prefix tree:class LemmaTrie: def __init__(self) -> None: self.children: dict[str, LemmaTrie | Entity] = {} def search(self, tokens: list['Token'], depth: int = 0, punct_tail: int = 0) -> tuple[Entity | None, int]: if not tokens: return None, 0 token = tokens.pop(0) if not re.match(config.word_pattern, token.text): return self.search(tokens, depth + 1, punct_tail + 1) if child := self.children.get(token.lemma_): child = child.search(tokens, depth + 1) return child if child and child[0] else (self.children.get('#'), depth - punct_tail) def add(self, lemmas: list[str], entity: Entity) -> Entity: if not lemmas: if '#' not in self.children: self.children['#'] = entity return self.children['#'] if lemmas[0] not in self.children: self.children[lemmas[0]] = LemmaTrie() return self.children[lemmas[0]].add(lemmas[1:], entity)
The search takes a list of words (in the form of tokens), and recursively searches the deepest leaf by lemmas, preserving the depth of the search. Since tokens may also contain punctuation, which we cannot simply discard from the general list, for the sake of backward compatibility of the text, we simply skip them, remembering the “punctuation tail”, which we will subtract from the depth counter for the found sheet.
Adding a new leaf goes all the way through the lemmas, adding if they do not already exist at some level, and at the end tries to assign the passed one
Entity leaf of a tree. If a leaf along this path already exists, we do not overwrite it, otherwise we will lose the “progress” of this entity. We return a sheet, be it a new one or one that existed before calling the method, by which we will check the repetition distance. Lemma distance is not taken into account in this verbose version. In case of a possible conflict of translations between an existing and transferred entity, we give preference to the existing one.❯ Basic logic
Currently, we receive a list of offers, pass them through the translator and receive offer tokens for each language with additional embeddings. We encapsulate sentence tokens in two languages in a class
Sentence, where our main logic will be.In the central method, we iterate through the tokens of the original proposal, checking them for conditions until the first match. First of all, let's take into account the conditions for passing tokens - if this token has already been processed from future methods; if it is a punctuation token; and if NER spacy marked the token as part of an entity that we consider untranslatable, e.g.
PERSON And DATE. We will leave the categories of untranslatable entities for editing in the configuration file. In addition to this, we skip articles, which are also defined spacy by value Art V token.morph.get('PronType'). A check could be applied here token.pos_ == 'DET', but in addition to articles, it will also work on some quantitative words, as well as demonstrative and possessive pronouns.def process_tokens(self) -> None: for idx_src, token_src in enumerate(self.tokens_src): self.entity_counter += 1 logging.debug(f'Processing token: {token_src}') if token_src in self.skip: logging.debug('Skipping previously processed token') elif not re.match(config.word_pattern, token_src.text): self.entity_counter -= 1 logging.debug('Skipping punctuation') elif token_src.ent_type_ in config.untranslatable_entities or 'Art' in token_src.morph.get('PronType'): logging.debug(f'Skipping untranslatable entity or article: {token_src.ent_type_} – {token_src.morph.get("PronType")}') elif …
And finally we begin the actual processing. First, let's check whether there is in our
lemma_trie sequence starting from the current token. If a sequence is found, add it to the output after checking the repetition distance:def process_tokens(self) -> None: for idx_src, token_src in enumerate(self.tokens_src): … elif self.trie_search_and_process(idx_src): logging.debug('Found multiword chain') def trie_search_and_process(self, idx_src: int) -> bool: entity, depth = self.container.lemma_trie.search(self.tokens_src[idx_src:]) if depth > 1: self.treat_trie_entity(entity, self.tokens_src[idx_src : idx_src + depth]) return True return False def treat_trie_entity(self, entity: Entity, tokens_src: list['Token'], tokens_trg=None) -> None: if entity.check_repeat(self.entity_counter): entity.update(self.entity_counter) self.result.append((' '.join(token.text.lower() for token in tokens_src), entity.translation)) self.skip |= set(tokens_src)
Regardless of whether it is necessary to secure the found sequence in the output with translation, we add these tokens to the gap in the next iterations so as not to translate part of the entity as an additional separate word.
If the sequence is not found, move on. The next step is to check whether the token is the beginning of a named entity. To our joy, and this spacy has already recognized it, and we just have to check the value
ent_iob_ current and next tokens. If the current one is marked as B, and the next one I, we can safely add a new entity:def process_tokens(self) -> None: for idx_src, token_src in enumerate(self.tokens_src): … elif self._is_start_of_named_entity(idx_src): self.add_named_entity_to_trie(idx_src) def _is_start_of_named_entity(self, idx_src: int) -> bool: return ( self.tokens_src[idx_src].ent_iob_ == 'B' and len(self.tokens_src) > idx_src + 1 and self.tokens_src[idx_src + 1].ent_iob_ == 'I' ) def add_named_entity_to_trie(self, idx_src: int) -> None: seq_tokens = [self.tokens_src[idx_src]] for forw_token in self.tokens_src[idx_src + 1 :]: if forw_token.ent_iob_ != 'I': break seq_tokens.append(forw_token) translation = self.container.translator.translate(' '.join(token.text for token in seq_tokens)) entity = self.container.lemma_trie.add([token.lemma_ for token in seq_tokens], Entity(translation)) self.treat_trie_entity(entity, seq_tokens)
We collect the entire named sequence and translate it with a separate call, which is more reliable than looking for alignments in entire sentences. At the end, we add the sequence to our structure and save the result in the current generation.
The next check will be inclusion in the list of stop words, which we can take from
nltk.corpus.stopwords(lang). This is a token skip condition, but we only perform this check now because the stop word may be the start of an idiom present in LemmaTrie. Therefore, first check for multi-word occurrences, then skip by stop word.The issue with articles is a bit of a let-down here, since we skip them at the beginning and may miss cases where they might have been recognized as part of a multi-word sequence, and, in some cases, translated differently, but
LemmaTrie too littered with tripling a, the, _, and this is not to mention languages with a much larger number of articles.And if we have reached this step without interrupting any of the previous checks, then it’s time to start the task of aligning individual words, which we will move into a separate module. Here we want to obtain sequences of original and translation tokens that correspond to each other, with an assessment of the confidence of this correspondence. For initial alignment we will use simalign. This tool takes as input words from two sentences and returns a list of index pairs that it recognizes as matching. Let's get these pairs and translate them into a cross dictionary:
def _align_tokens(self) -> tuple[dict[int, list[int]], dict[int, list[int]]]: # self.aligner = SentenceAligner(model='xlm-roberta-base', token_type='bpe', matching_methods=config.alignment_matching_method) align = self.aligner.get_word_aligns([t.text for t in self.tokens_src], [t.text for t in self.tokens_trg]) src_to_trg = defaultdict(list) trg_to_src = defaultdict(list) for a, b in align[config.alignment_matching_method]: src_to_trg[a].append(b) trg_to_src[b].append(a) return src_to_trg, trg_to_src
We need the step of translating alignments from ordinary pairs because our tokens will not always be aligned one-to-one, and we need to work through each of the cases, including the one where no alignment was found for the token.
We can configure the alignment method itself, passed to simalign, choosing the more preferable one. Despite this, to improve the results, we should independently double-check and refine the results obtained based on our own embeddings.
After receiving the dictionary of correspondences, we will create a delegating method for processing transmitted tokens according to their alignment type:
def process_alignment(self, idx_src: int) -> tuple[float, list[Token], list[Token]]: if idx_src not in self.src_to_trg: return self.treat_not_aligned_token(idx_src) to_trg_len = len(self.src_to_trg[idx_src]) to_src_len = len(self.trg_to_src[self.src_to_trg[idx_src][0]]) if to_trg_len == 1 and to_src_len == 1: return self.one_to_one(idx_src) if to_trg_len == 1 and to_src_len > 1: return self.many_to_one(idx_src) if to_trg_len > 1 and all(len(self.trg_to_src[x]) == 1 for x in self.src_to_trg[idx_src]): return self.one_to_many(idx_src) return self.many_to_many(idx_src)
In the first case, we go straight to manual alignment, and here we will only allow one-to-one alignment, since a broader search will more often lead to an erroneous match. It is better to miss a word once than to produce an incorrect result.
def treat_not_aligned_token(self, idx_src: int) -> tuple[float, list[Token], list[Token]]: unaligned_trg_tokens = self._filter_aligned(self.tokens_trg, self.trg_to_src.keys(), reverse=True) best_match_idx, best_score = self._find_best_match(unaligned_trg_tokens, self.tokens_src[idx_src]) return best_score, [self.tokens_src[idx_src]], [self.tokens_trg[best_match_idx]]
Helper Methods
_filter_aligned And _find_best_match will be useful to us again and again. _filter_aligned with a flag reverse will return a dictionary of tokens by their indices from the translation sentence that did not have a single pair in the alignment from simalign. For undefined tokens, search only among the same undefined tokens.def _filter_aligned(tokens: list[Token], alignment: list[int] | tuple[int], reverse=False) -> dict[int, Token]: alignment = set(alignment) if reverse: return {idx: token for idx, token in enumerate(tokens) if idx not in alignment or token.is_punct} return {idx: token for idx, token in enumerate(tokens) if idx in alignment or token.is_punct}
We also add punctuation tokens to the output to make it easier to work with in one of the following methods.
IN
_find_best_match We sort through the tokens from the resulting set, ignoring punctuation ones, and use the cosine distance to determine the correspondence estimate.def _find_best_match(self, checkable_tokens: dict[int, Token], control_token: Token) -> tuple[int | None, float]: best_match_idx, best_score = None, float('-inf') for idx, token in checkable_tokens.items(): if token.is_punct: continue score = self._cosine_similarity(token._.embedding, control_token._.embedding) if score > best_score: best_score = score best_match_idx = idx return best_match_idx, best_score
According to this assessment in
Sentence we will accept or reject tokens for our final result. For now, let's move on to cases where simalign I found the alignment.The simplest method for us is one-to-one:
def one_to_one(self, idx_src: int) -> tuple[float, list[Token], list[Token]]: idx_trg = self.src_to_trg[idx_src][0] if self.tokens_trg[idx_trg].is_punct: return float('-inf'), [], [] score = self._cosine_similarity(self.tokens_src[idx_src]._.embedding, self.tokens_trg[idx_trg]._.embedding) return score, [self.tokens_src[idx_src]], [self.tokens_trg[idx_trg]]
We check that there is no false alignment to the punctuation token, unfortunately this can happen, and then we simply return the tokens in question with the cosine distance of their embeddings.
In cases with multiple matches on one side, we need to add one more simplification to reduce the number of erroneous matches, namely, we will not consider tokens that do not form a single sequence in the original sentence. Such non-false alignment is an extremely rare case, and it would be even right to ignore it. For this filtering, we will define another auxiliary method that will try to expand the sequence in two directions from the reference token:
def _get_token_sequence(tokens: dict[int, Token], idx: int) -> list[Token]: if not tokens: return [] seq_tokens = [tokens[idx]] for i in range(idx - 1, -1, -1): if i not in tokens: break if tokens[i].is_punct: continue seq_tokens.insert(0, tokens[i]) for i in range(idx + 1, int(max(tokens.keys())) + 1): if i not in tokens: break if tokens[i].is_punct: continue seq_tokens.append(tokens[i]) return seq_tokens
Punctuation symbols do not interrupt the sequence, which is why we saved them in
_filter_aligned(), since this case of true alignment is more realistic, although this point is debatable. Now we can implement new methods:def one_to_many(self, idx_src: int) -> tuple[float, list[Token], list[Token]]: checkable_tokens = self._filter_aligned(self.tokens_trg, self.src_to_trg[idx_src]) best_match_idx, best_score = self._find_best_match(checkable_tokens, self.tokens_src[idx_src]) if best_match_idx is None: return float('-inf'), [], [] seq_tokens_trg = self._get_token_sequence(checkable_tokens, best_match_idx) return best_score, [self.tokens_src[idx_src]], seq_tokens_trg def many_to_one(self, idx_src: int) -> tuple[float, list[Token], list[Token]]: idx_trg = self.src_to_trg[idx_src][0] checkable_tokens = self._filter_aligned(self.tokens_src, self.trg_to_src[idx_trg]) best_match_idx, best_score = self._find_best_match(checkable_tokens, self.tokens_trg[idx_trg]) if best_match_idx is None: return float('-inf'), [], [] seq_tokens_src = self._get_token_sequence(checkable_tokens, best_match_idx) return best_score, seq_tokens_src, [self.tokens_trg[idx_trg]]
We leave only those tokens to which an alignment was found, find the best match, and from it we try to build a sequence in two directions using the same aligned tokens.
And the last case is many-to-many. Here the general logic will be the same, but with a couple of additions. First of all, we double search to find the strongest pair of all matches, after which we filter and build a sequence for each language. In the process, we also remember all considered pairs so that when calling from the following tokens, we do not add a new result that includes the already considered one.
def many_to_many(self, idx_src: int) -> tuple[float, list[Token], list[Token]]: best_src, best_trg, best_score = None, None, float('-inf') for idx_trg in self.src_to_trg[idx_src]: src_by_trg = tuple(self.trg_to_src[idx_trg]) if (idx_trg, src_by_trg) in self.seen: continue self.seen.add((idx_trg, src_by_trg)) checkable_tokens = self._filter_aligned(self.tokens_src, src_by_trg) curr_match, curr_score = self._find_best_match(checkable_tokens, self.tokens_trg[idx_trg]) if curr_score > best_score: best_src = curr_match best_trg = idx_trg best_score = curr_score if best_src is None: return float('-inf'), [], [] checkable_tokens_src = self._filter_aligned(self.tokens_src, self.trg_to_src[best_trg]) checkable_tokens_trg = self._filter_aligned(self.tokens_trg, self.src_to_trg[best_src]) seq_tokens_src = self._get_token_sequence(checkable_tokens_src, best_src) seq_tokens_trg = self._get_token_sequence(checkable_tokens_trg, best_trg) return best_score, seq_tokens_src, seq_tokens_trg
Having considered all cases, we can complete processing in
Sentence, where we have already performed a series of checks and reached the point of obtaining the best alignment from the current token. After receiving the lists of tokens and assessing their compliance, we will remove the initial articles using the already familiar check 'Art' in token.morph.get('PronType') from both lists, where they could crawl through the expansion of the sequence from other tokens, after that we will check whether the best estimate passes the minimum threshold (configured by the user), and based on the number of remaining tokens of the original language, we will add the result to LemmaTrie, If len > 1, or in LemmaDict, saving it to the current output.for idx_src, token_src in enumerate(self.tokens_src): if … … score, seq_tokens_src, seq_tokens_trg = self.aligner.process_alignment(idx_src) seq_tokens_src = list(dropwhile(lambda t: 'Art' in t.morph.get('PronType'), seq_tokens_src)) seq_tokens_trg = list(dropwhile(lambda t: 'Art' in t.morph.get('PronType'), seq_tokens_trg)) if not seq_tokens_src or not seq_tokens_trg: continue if score < config.min_align_weight: self.possible_result.append((round(score, 2), seq_tokens_src, seq_tokens_trg)) logging.debug(f'Rejected after alignment: {score}, {seq_tokens_src}, {seq_tokens_trg}') continue logging.debug(f'Approved after alignment: {score}, {seq_tokens_src}, {seq_tokens_trg}') if len(seq_tokens_src) == 1: self.treat_dict_entity(seq_tokens_src, seq_tokens_trg) else: translation = ' '.join(token.text for token in seq_tokens_trg) entity = self.container.lemma_trie.add( [token.lemma_ for token in seq_tokens_src], Entity(translation) ) self.treat_trie_entity(entity, seq_tokens_src, seq_tokens_trg)
All that remains is to return the result from
Sentence into the main processing and generation cycle. For now, we will return them in the form of lists, with notes about what they are, which will help us at the synthesis stage. It is important for us to distinguish between the sentences themselves and the translated dictionary.We will add the translated sentence to the output only if there are more than five new or necessary words to be consolidated, or more than a quarter of the length of all sentence tokens, but not less than two. To improve the text output, we will also separately discard the possible tail of line breaks from the original sentence, and return it after the dictionary and possible translation.
def get_results(self) -> list[tuple[int, str | list[tuple[str, str]]]]: stripped = self.sentence.rstrip() tail = self.sentence[len(stripped) :] result = [(0, stripped)] if self.result: result.append((2, self.result)) if self._translation_line_condition: result.append((1, self.translated_sentence)) if tail: result.append((3, tail)) return result @property def _translation_line_condition(self) -> bool: n = len(self.result) quarter = len(self.tokens_src) // 4 return n > 5 or 2 < n > quarter
At the end of the main processing cycle, we will increase the total counters of tokens and sentences, which will be saved from session to session, to correctly calculate the repetition distance.
self.container.entity_counter = sentence_obj.entity_counter self.container.sentence_counter += 1 self.output_text.extend(sentence_obj.get_results())
❯ Speech synthesis
What remains is the task of synthesizing the book with our translations. We implement it using various synthesizers, also at the user’s choice.
The simplest known synthesizer is gTTS. This library is a wrapper for accessing the open and free part of the Google Translate API. Nothing is configured, no explicit limits, the output is .mp3.
class GTTSProvider: def __init__(self) -> None: self.model = gTTS def synthesize(self, text: str, lang: str) -> AudioSegment: audio_buffer = BytesIO() tts = self.model(text=text, lang=lang) tts.write_to_fp(audio_buffer) audio_buffer.seek(0) return AudioSegment.from_mp3(audio_buffer)
Among the local tools, the best one tested was CoquiTTS. This library has really wide functionality, but today we are only interested in direct synthesis. To do this, we only need to select the desired multilingual model, for example
tts_models/multilingual/multi-dataset/xtts_v2, and voice acting. The code is also minimal, we only need the main library methods:class CoquiTTSProvider: def __init__(self) -> None: self.model = TTS(model_name=config.synth_model) self.voice = config.voice_src def synthesize(self, text: str, lang: str) -> AudioSegment: audio_buffer = BytesIO() self.model.tts_to_file(text=text, file_path=audio_buffer, speaker=self.voice, language=lang) audio_buffer.seek(0) return AudioSegment.from_wav(audio_buffer)
The third option considered would be Google Cloud, which we already addressed when creating the translation. Here we have much more opportunities for direct configuration, an impressive list of models and voices, and a couple more pleasant advantages. Of course, this is a potentially paid tool, but the existing free limit is always free tier for personal needs it will be enough - every month 4 million input symbols for standard synthesis models, and 1 million each for generation using WaveNet, Neural2 And Journey models [are counted separately, and in total it is 7M characters?]. To work with this API, you will need to create GC project and configure access, but this is beyond the scope of the article. In the framework - a method for obtaining synthesized text:
class GoogleCloudTTSProvider: def __init__(self) -> None: self.client = texttospeech.TextToSpeechClient() def synthesize(self, text: str, lang: str, speed: float) -> AudioSegment: input_text = texttospeech.SynthesisInput(text=text) audio_config = texttospeech.AudioConfig( audio_encoding=texttospeech.AudioEncoding.LINEAR16, speaking_rate=speed ) voice_name = config.voice_src if lang == config.source_lang else config.voice_trg lang = voice_name[:5] voice = texttospeech.VoiceSelectionParams(language_code=lang, name=voice_name) response = self.client_short.synthesize_speech(input=input_text, voice=voice, audio_config=audio_config) audio_buffer = BytesIO() audio_buffer.write(response.audio_content) audio_buffer.seek(0) return AudioSegment.from_wav(audio_buffer)
A not very convenient feature is that such generation is only possible for texts <5K bytes. For larger texts, the synthesis result will be saved in Google Cloud Storage, from where we will have to pull it out additionally. Let's adapt the code to these realities:
def __init__(self) -> None: self.client_short = texttospeech.TextToSpeechClient() self.client_long = texttospeech.TextToSpeechLongAudioSynthesizeClient() self.storage_client = storage.Client() def synthesize(self, text: str, lang: str, speed: float) -> AudioSegment: … if len(text.encode('utf-8')) > 4990: logging.debug('GC TTS: Long audio synthesis') return self._synthesize_long(input_text, voice, audio_config) return self._synthesize_short(input_text, voice, audio_config) def _synthesize_short(self, input, voice, audio_config) -> AudioSegment: response = self.client_short.synthesize_speech(input=input, voice=voice, audio_config=audio_config) audio_buffer = BytesIO() audio_buffer.write(response.audio_content) audio_buffer.seek(0) return AudioSegment.from_wav(audio_buffer) def _synthesize_long(self, input, voice, audio_config) -> AudioSegment: bucket = self.storage_client.bucket('name') blob = bucket.blob('audio_output.wav') if blob.exists(): blob.delete() parent = f'projects/{config.google_cloud_project_id}/locations/{config.google_cloud_project_location}' output_gcs_uri = 'gs://name/audio_output.wav' request = texttospeech.SynthesizeLongAudioRequest( input=input, voice=voice, audio_config=audio_config, parent=parent, output_gcs_uri=output_gcs_uri ) operation = self.client_long.synthesize_long_audio(request=request) result = operation.result(timeout=300) audio_buffer = BytesIO() blob.download_to_file(audio_buffer) audio_buffer.seek(0) return AudioSegment.from_wav(audio_buffer)
And here once again, for the last time, we need to remember about monthly restrictions: a free storage limit of 5 GB for GCS only applies to storage in regions
US-WEST1, US-CENTRAL1, And US-EAST1. Free data transfer limit – 100GB. For personal needs, again, more than enough.We have identified individual synthesizers, now let’s connect them to our task. We have two main methods of synthesis: simple - what came was passed on; and synthesis taking into account the type of text - according to the ints that we gave from
Sentence.get_results():class SpeechSynthesizer: def __init__(self) -> None: if not config.speech_synth: return match config.synth_provider: case 'CoquiTTS': self.model = CoquiTTSProvider() case 'gTTS': self.model = GTTSProvider() case 'GoogleCloud': self.model = GoogleCloudTTSProvider() case _: raise ValueError(f'Unknown synth_provider value ({config.synth_provider}).') def synthesize(self, text: str, lang: str, speed: float = 1.0) -> AudioSegment: if text: text = re.sub(r'^\P{L}+|[\P{L}?!\.]+$', '', text) if not text: return False return self.model.synthesize(text, lang, speed) def synthesize_by_parts(self, parts: list[tuple[int, str | list[tuple[str, str]]]], speed: float) -> AudioSegment: audio_buffer = self.silent(0) for flag, value in parts: match flag: case 0: audio = self.synthesize(value, config.source_lang, speed) case 1: audio = self.synthesize(value, config.target_lang, speed) case 2: audio = self.synthesize_by_parts( [(i, v) for val in value for i, v in enumerate(val)], config.vocabulary_pronunciation_speed ) case _: continue if audio: audio_buffer += audio audio = None return audio_buffer @staticmethod def save_audio(audio: AudioSegment, name: str) -> None: audio.export(config._root_dir / f'{name}.wav', format='wav')
We clear the input text for “simple” synthesis from insignificant initial and final characters in order to reduce the amount of access to a specific synthesizer. To work with audio we use
AudioSegment from pydub.To enter the synthesis in parts, after all
Sentence.get_results(), we receive original proposals like 0, translated sentences as 1, and a dictionary, under the number 2. We convert the latter into a new list of words or single expressions numbered with zeros and ones, and process them by calling the same method again.We will also provide separate adaptation of speech speed for sentences and vocabulary. Of the synthesizers reviewed, only the GC, therefore for gTTS And CoquiTTS you need to add a decorator with a call FFmpeg (permissible speed limits 0.5-2):
def adjust_audio_speed(func): @wraps(func) def wrapper(self, text: str, lang: str, speed: float) -> AudioSegment: audio: AudioSegment = func(self, text, lang) if speed != 1: adjusted_audio = BytesIO() audio.export(adjusted_audio, format='wav', parameters=['-filter:a', f'atempo={speed}']) adjusted_audio.seek(0) audio = AudioSegment.from_wav(adjusted_audio) return audio return wrapper class GTTSProvider: @adjust_audio_speed def synthesize(…): … class CoquiTTSProvider: @adjust_audio_speed def synthesize(…): …
And since we are working with Google Cloud TTS, it’s also worth taking advantage of another advantage – support for the format SSML. This is a text markup language for speech synthesis that includes a dozen and a half tags.
We will need the most basic structural (paragraph, sentence and pause), as well as sound-configuring tags, such as
emphasis giving emphasis to certain areas of the text; prosody, to fine-tune the pitch, speed and volume of the sound; and also voice, to change the speaker during synthesis, without separate calls, which is very useful for our task of multilingual synthesis. We don’t need to mark each section of the text with a specific speaker; we still transmit the main one, which is used for voice-over, but for the marked ones voice areas the specified tag will be applied. The standard is somewhat broader, and there are still some very interesting points.Well, let's create it using Jinja2 general output ssml template for each processed sentence:
<p> <s> {% if sentence_speed != 1 %} <prosody rate="{{ (sentence_speed * 100) | int }}%">{{ sentence }}</prosody> {% else %} {{ sentence }} {% endif %} </s> {% if result %} <break strength="strong"/> <s> <emphasis level="moderate"> <prosody {% if vocabulary_speed != 1 %}rate="{{ (vocabulary_speed * 100) | int }}%" {% endif %}pitch="+10%" volume="-5dB"> {% for src, trg in result %} {{ src }}-<voice name="{{ voice_trg }}">{{ trg }}</voice> {% if not loop.last %}<break strength="medium"/>{% endif %} {% endfor %} </prosody> </emphasis> </s> {% endif %} {% if translated_sentence %} <break strength="strong"/> <s> <voice name="{{ voice_trg }}"> {% if sentence_speed != 1 %} <prosody rate="{{ (sentence_speed * 100) | int }}%">{{ translated_sentence }}</prosody> {% else %} {{ translated_sentence }} {% endif %} </voice> </s> {% endif %} </p> <break strength="x-strong"/>
Of course, we will minify the template to reduce the volume of calls to the cloud synthesizer. Now let's add ssml processing along the entire path:
#main.py class Main: def __init__(self) -> None: … self.output_ssml: list[str] = ['<speak>'] … def process(self, text: str) -> None: … for sentence_text in sentences: … self.output_text.extend(sentence_obj.get_results()) self.output_ssml.append(sentence_obj.get_rendered_ssml()) self.output_ssml.append('</speak>') if config.speech_synth: if config.use_ssml: self.output_audio = self.container.synthesizer.synthesize( ''.join(self.output_ssml), config.source_lang, 1 ) else: self.output_audio = self.container.synthesizer.synthesize_by_parts( self.output_text, config.sentence_pronunciation_speed ) self.container.synthesizer.save_audio(self.output_audio, 'multilingual_output') #sentence_processing.py from jinja2 import Template class Sentence def __init__(…): … with open(config._root_dir / 'src' / 'templates' / 'template.min.ssml', 'r', encoding='utf-8') as file: self.template = Template(file.read()) def get_rendered_ssml(self) -> str: if not config.use_ssml: return '' translated = self.translated_sentence.rstrip() if self._translation_line_condition else '' return self.template.render( sentence=self.sentence.rstrip(), translated_sentence=translated, result=self.result, voice_trg=config.voice_trg, sentence_speed=config.sentence_pronunciation_speed, vocabulary_speed=config.vocabulary_pronunciation_speed, ) #synthesis.py class GoogleCloudTTSProvider: def synthesize(self, text: str, lang: str, speed: float) -> AudioSegment: if config.use_ssml: input_text = texttospeech.SynthesisInput(ssml=text) audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.LINEAR16) voice_name = config.voice_src else: input_text = texttospeech.SynthesisInput(text=text) audio_config = texttospeech.AudioConfig( audio_encoding=texttospeech.AudioEncoding.LINEAR16, speaking_rate=speed ) voice_name = config.voice_src if lang == config.source_lang else config.voice_trg …
And before moving on to completion, you can try to implement one more improvement.
❯ Reusing sound
When synthesizing a dictionary, we transfer individual words to voiceover, which, taken away from the context, significantly changes the pronunciation, and in the case of homographs, even generates additional errors. In addition, the synthesis of individual words is much worse in quality than full-fledged sentences, or at least phrases.
If we could reuse words from an already generated whole sentence, this would affect the quality of the dictionary’s voice acting and would reduce the number of calls to the synthesizer. Therefore, let's describe another path in which we will use MFA. This tool will help to align the gaps from the synthesized sentence to the words from the transmitted text, which will allow us to assign each token its sound at the stage of token processing.
Before this, we formed a textual representation of our output text, and only then turned to the synthesizer. Usage MFA changes this path, since at the stage of generating the result we need to already have a certain sound of tokens, therefore we will transfer sentences for voice acting at the beginning of processing. This also means that we won't be using ssml along the way and will again be composing the output audio manually.
U MFA there is no official python library, so we will interact through
subprocess and temporary files. You will also first need to download models and dictionaries for the specified languages and specify the path to them in the configuration file.def _align_audio(text: str, audio_buffer: 'AudioSegment', lang: str) -> str: temp = config._root_dir / 'temp' audio_buffer.export(temp / 'temp.wav', format='wav') with open(temp / 'temp.txt', 'w', encoding='utf-8') as f: f.write(text) dict_path = Path(config.mfa_dir) / 'dictionary' / f'{lang}_mfa.dict' model_path = Path(config.mfa_dir) / 'acoustic' / f'{lang}_mfa.zip' command = ['mfa', 'align', '--clean', '--single_speaker', str(temp), dict_path, model_path, str(temp)] subprocess.run(command, check=True)
At the output we will get .TextGrid file with intervals of words and individual phonemes. Through tgt Let's go through all the words and return a list of them with the found audio fragments, adding a configurable offset:
def _split_audio_by_alignment(audio: 'AudioSegment') -> list[dict]: textgrid = tgt.io.read_textgrid(config._root_dir / 'temp' / 'temp.TextGrid') segments = [] word_tier = textgrid.get_tier_by_name('words') for interval in word_tier.intervals: if interval.text.strip(): start_time = max(0, interval.start_time * 1000 - config.mfa_start_shift_ms) end_time = min(len(audio), interval.end_time * 1000 + config.mfa_end_shift_ms) segments.append({'text': interval.text, 'audio': audio[start_time:end_time]}) return segments
Now we can assign these fragments to specific tokens, concatenating the sounds for the aggregated ones:
def _process_mfa_alignment(self) -> None: self._align_audio(self.parent.sentence, self.sentence_audio, config.source_full_lang) segments_src = self._split_audio_by_alignment(self.sentence_audio) self._align_audio(self.parent.translated_sentence, self.translated_audio, config.target_full_lang) segments_trg = self._split_audio_by_alignment(self.translated_audio) for segments, tokens in ((segments_src, self.tokens_src), (segments_trg, self.tokens_trg)): i, j = 0, 0 while i < len(segments) and j < len(tokens): if segments[i]['text'] in tokens[j].text.lower(): tokens[j]._.audio += segments[i]['audio'] i += 1 else: j += 1
Custom attribute
Token._.audio with a certain default value we set at the beginning of program execution, if necessary by configuration:#main.py class Main: def __init__(self): … if config.speech_synth and config.use_mfa: self.output_audio: 'AudioSegment' = self.container.synthesizer.silent(0) if not Token.has_extension('audio'): Token.set_extension('audio', default=self.container.synthesizer.silent(200))
We encapsulate all this in our own class, where we immediately begin to form the final audio fragment of the processed sentence:
class MFAligner: def __init__(self, parent: 'Sentence') -> None: if not config.speech_synth or not config.use_mfa: self.idle = True return … self.sentence_audio = self.synth.synthesize(self.parent.sentence, config.source_lang, speed) self.translated_audio = self.synth.synthesize(self.parent.translated_sentence, config.target_lang, speed) self.output_audio = self.sentence_audio[:] self._process_mfa_alignment()
Each time a new result is added, while the methods are running
Sentence, add the sound of the resulting tokens to the output:def append_mfa_audio_to_output(self, result_src: list['Token'], result_trg: list['Token'] | str) -> None: if self.idle: return for token in result_src: self.output_audio += token._.audio self.output_audio += self.synth.silent(200) if isinstance(result_trg, list): for token in result_trg: self.output_audio += token._.audio else: translation_audio = self.synth.synthesize(result_trg, config.target_lang) self.output_audio += translation_audio
If the translation came to the input not in the form of a list of tokens, but in the form of an ordinary string, we turn to the synthesizer, as in the main path. We call the method at the points where the result is added -
treat_dict_entity And treat_trie_entity class Sentence, and at the end we define the method that closes processing:#sentence_processing.py class Sentence: def __init__(…): … self.mfa_aligner = MFAligner(self) # -> _process_mfa_alignment() def treat_dict_entity(…): … if lemma.check_repeat(self.entity_counter) and entity.check_repeat(self.entity_counter): … self.mfa_aligner.append_mfa_audio_to_output(tokens_src, tokens_trg) self.skip |= set(tokens_src) def treat_trie_entity(…): if entity.check_repeat(self.entity_counter): entity.update(self.entity_counter) self.result.append((' '.join(token.text.lower() for token in tokens_src), entity.translation)) self.mfa_aligner.append_mfa_audio_to_output(tokens_src, tokens_trg if tokens_trg else entity.translation) self.skip |= set(tokens_src) def get_result_mfa_audio(self) -> 'AudioSegment': return self.mfa_aligner.get_result_audio(self._translation_line_condition) #mfa_aligner.py class MFAligner: def get_result_audio(self, additional_translation: bool = False) -> 'AudioSegment': if additional_translation: self.output_audio += self.translated_audio return self.output_audio
Now in
Мain, when processing each sentence, we will add the processed sentence to the audio output, and we will additionally wrap the formation of the final audio in checking the processing path:class Main: def process(self, text: str) -> None: … for sentence_text in sentences: … if config.speech_synth and config.use_mfa: self.output_audio += sentence_obj.get_result_mfa_audio() if config.speech_synth: if not config.use_mfa: if config.use_ssml: self.output_audio = self.container.synthesizer.synthesize( ''.join(self.output_ssml), config.source_lang, 1 ) else: self.output_audio = self.container.synthesizer.synthesize_by_parts( self.output_text, config.sentence_pronunciation_speed ) self.container.synthesizer.save_audio(self.output_audio, 'multilingual_output')
Ready!
And as a final touch, at the end of all processing of the main loop, let’s not forget to save our structures and positions, with a little protection from accidental overwriting, and define a method for loading them for the next generations:
def save_structures(self) -> None: filepath = config._root_dir / f'{config.output_storage_filename}.pkl' if filepath.exists(): old_filepath = filepath.with_suffix('.pkl.old') if old_filepath.exists(): old_filepath.unlink() filepath.rename(old_filepath) with open(filepath, 'wb') as file: pickle.dump((self.lemma_dict, self.lemma_trie, self.entity_counter, self.sentence_counter), file) def load_structures(self) -> None: with open(config._root_dir / f'{config.input_storage_filename}.pkl', 'rb') as file: self.lemma_dict, self.lemma_trie, self.entity_counter, self.sentence_counter = pickle.load(file)
❯ Results and results
Well, the line count has already exceeded a thousand, so it’s worth briefly going over what we actually did here:
- translated each sentence, giving the user the choice of a specific translator
- ...or they compared the text with a given monolithic translation, making a preparation of a parallel text
- We prepared tokens for each sentence in two languages, adding embeddings to them and solving the issue of tokenizer compatibility
- Separately implemented storage structures for single-word and multi-word entities, with two levels of storage, by lemmas and specific forms, for higher quality output while minimizing repetition
- described the central logic of token processing, where the main part was taken up by working with alignments, flavored with our own double-checks and additions
- Based on this, we generated a conclusion for each sentence, consisting of the original sentence, a dictionary of new and fixed expressions and words, and a translation of the full sentence, if necessary
- We separately provided output in ssml format, according to a template we defined, for synthesizers that support this format
- implemented variable speech synthesis using various tools and configurations
- We separately worked on the way to recognize and reuse audio fragments of a synthesized sentence in specific tokens, to unify the sound and reduce the load on the synthesizer
- saved our structures for the next generations
So we went through all the important points of the application, leaving methods that do not affect the logic and overall structure outside the brackets. The full application code is still available on github. Now let's see what came out of this:
What is presented here is not the best results, but simply random text trying out various parameters with an empty dictionary. When voicing, speed values of 0.9 for sentences and 0.7 for vocabulary were used. The remaining changeable parameters are presented in the video.
The first thing that catches your eye, or rather your ears, is the synthesis of single words. This is a big problem, because their voice acting is a mandatory part of our application. But how to get rid of distortions and upward accentuations on “loners” without conveying the context is not yet entirely clear. WITH
MFA An interesting approach, but its use incredibly increases the processing time, and the torn words are accompanied by a characteristic “twitching”. It would be possible with ssml to generate text with explicit pauses through <break/> between words so that their fragments are cleaner, but the synthesizer voices such text with pauses as separate words, returning us to the initial problems. So the question of improving the sound of single words is still open, as are the comments below, thank you.Next comes leveling. Here, of course, it is far from ideal, but overall the result inspires confidence. Yes, there are errors in both directions, both with omission of obvious pairs and with false matching of something unknown, but this can be corrected by selecting the configuration and increasing the matching threshold. In addition, this part is completely open for improvements, for example, you can think about possible post-checks or combining alignment methods.
And now about the central logic - learning and consolidating words. There are no complications here, it just works as it should. Further results are completely dependent on the user, who himself develops a dictionary and sets his own values for repetition intervals.
Below are two versions of the result, using the example of a short story O. Henry "Cosmopolitan in a cafe." Compare the first two sentences to see how the output changes after just one trained book. The configuration parameters used were
alignment=inter, aggregator=attention, weight=0.54, with translation and voice acting via GC. The story has 1950 words in total.In these examples, on an empty user dictionary, the result was 677 new entities, while on a dictionary trained on a single “Moby-Dick” there were already 489 (new + fixed). Of course, the first audio will be voluminous, but this difference shows how with each new generated book there will be fewer and fewer distractions for translation, and the user’s vocabulary will become wider, which allows us to consider the general goal of today’s text achieved.
We started with a simple idea - to combine parallel translations and audiobooks, without constant distraction and loss of immersion. The partial translation system we created aims to provide each user with a personalized, adaptive experience.
In this approach, the user is not just a passive listener, but an active participant in the process. The vocabulary gradually expands, texts become more understandable, and unknown words are smoothly integrated into the general context. This creates a sense of progress, and with each new book the need for help decreases and the level of confidence increases.
I hope it was interesting, and it would be better if it was useful. For all this, thank you for your attention.
In no way associated with the creator of the channel, except for subscribing to it. I just want to help new listeners and the author find each other. Hidden gem, imo. Such things.
- 🟢 A series of articles on cellular automata (last article)
- 🟢 History of forest fire modeling
- 🟠 REcollapse: fuzzing using unicode normalization
- 🔵 Stop Using [a-zа-яё]: Working Correctly with Unicode Characters and Categories in Regular Expressions
- 🟢 A Brief History of the Calendar and the Fantasia of the Six-Day Week
- 🟢 Complete LeetCode in a year: site tour and roadmap
- 🟢 Absolute multilingualism and typography on any layout

Why This Matters In Practice
Beyond the original publication, We create an audiobook generator with personal translation matters because teams need reusable decision patterns, not one-off anecdotes. Hello, Habr! While studying a fourth language, I once again decided to try to train my biological neural network on books with parallel tran...
Operational Takeaways
- Separate core principles from context-specific details before implementation.
- Define measurable success criteria before adopting the approach.
- Validate assumptions on a small scope, then scale based on evidence.
Quick Applicability Checklist
- Can this be reproduced with your current team and constraints?
- Do you have observable signals to confirm improvement?
- What trade-off (speed, cost, complexity, risk) are you accepting?