first commit, based on v1.1.1

This commit is contained in:
hailin 2025-06-27 19:09:04 +08:00
commit 739fe57d9d
41 changed files with 6155 additions and 0 deletions

90
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,90 @@
name: CI
on:
push:
branches:
- master
tags:
- v*
pull_request:
branches:
- master
jobs:
check-code-format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.9
uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install module
run: |
pip install wheel
pip install -e .[dev]
- name: Check code format with Black
run: |
black --check .
- name: Check imports order with isort
run: |
isort --check-only .
- name: Check code style with Flake8
if: ${{ always() }}
run: |
flake8 .
run-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.9
uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install module
run: |
pip install wheel
pip install -e .[dev]
- name: Run pytest
run: |
pytest -v tests/
build-and-push-package:
runs-on: ubuntu-latest
needs: [check-code-format, run-tests]
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.9
uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install dependencies
run: |
pip install wheel
- name: Build package
run: |
python3 setup.py sdist bdist_wheel
- name: Push package on PyPI
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags')
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}

15
.gitignore vendored Normal file
View File

@ -0,0 +1,15 @@
# Byte-compiled / Optimized / DLL Files
*.pyc
*.pyo
*.pyd
__pycache__/
# Distribution / Packaging
venv/
# Unit Test
.pytest_cache/
# Ignore IDE, Editor Files
.idea/
.vscode/

31
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,31 @@
# Contributing to faster-whisper
Contributions are welcome! Here are some pointers to help you install the library for development and validate your changes before submitting a pull request.
## Install the library for development
We recommend installing the module in editable mode with the `dev` extra requirements:
```bash
git clone https://github.com/SYSTRAN/faster-whisper.git
cd faster-whisper/
pip install -e .[dev]
```
## Validate the changes before creating a pull request
1. Make sure the existing tests are still passing (and consider adding new tests as well!):
```bash
pytest tests/
```
2. Reformat and validate the code with the following tools:
```bash
black .
isort .
flake8 .
```
These steps are also run automatically in the CI when you open the pull request.

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 SYSTRAN
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

4
MANIFEST.in Normal file
View File

@ -0,0 +1,4 @@
include faster_whisper/assets/silero_encoder_v5.onnx
include faster_whisper/assets/silero_decoder_v5.onnx
include requirements.txt
include requirements.conversion.txt

296
README.md Normal file
View File

@ -0,0 +1,296 @@
[![CI](https://github.com/SYSTRAN/faster-whisper/workflows/CI/badge.svg)](https://github.com/SYSTRAN/faster-whisper/actions?query=workflow%3ACI) [![PyPI version](https://badge.fury.io/py/faster-whisper.svg)](https://badge.fury.io/py/faster-whisper)
# Faster Whisper transcription with CTranslate2
**faster-whisper** is a reimplementation of OpenAI's Whisper model using [CTranslate2](https://github.com/OpenNMT/CTranslate2/), which is a fast inference engine for Transformer models.
This implementation is up to 4 times faster than [openai/whisper](https://github.com/openai/whisper) for the same accuracy while using less memory. The efficiency can be further improved with 8-bit quantization on both CPU and GPU.
## Benchmark
### Whisper
For reference, here's the time and memory usage that are required to transcribe [**13 minutes**](https://www.youtube.com/watch?v=0u7tTptBo9I) of audio using different implementations:
* [openai/whisper](https://github.com/openai/whisper)@[v20240930](https://github.com/openai/whisper/tree/v20240930)
* [whisper.cpp](https://github.com/ggerganov/whisper.cpp)@[v1.7.2](https://github.com/ggerganov/whisper.cpp/tree/v1.7.2)
* [transformers](https://github.com/huggingface/transformers)@[v4.46.3](https://github.com/huggingface/transformers/tree/v4.46.3)
* [faster-whisper](https://github.com/SYSTRAN/faster-whisper)@[v1.1.0](https://github.com/SYSTRAN/faster-whisper/tree/v1.1.0)
### Large-v2 model on GPU
| Implementation | Precision | Beam size | Time | VRAM Usage |
| --- | --- | --- | --- | --- |
| openai/whisper | fp16 | 5 | 2m23s | 4708MB |
| whisper.cpp (Flash Attention) | fp16 | 5 | 1m05s | 4127MB |
| transformers (SDPA)[^1] | fp16 | 5 | 1m52s | 4960MB |
| faster-whisper | fp16 | 5 | 1m03s | 4525MB |
| faster-whisper (`batch_size=8`) | fp16 | 5 | 17s | 6090MB |
| faster-whisper | int8 | 5 | 59s | 2926MB |
| faster-whisper (`batch_size=8`) | int8 | 5 | 16s | 4500MB |
### distil-whisper-large-v3 model on GPU
| Implementation | Precision | Beam size | Time | YT Commons WER |
| --- | --- | --- | --- | --- |
| transformers (SDPA) (`batch_size=16`) | fp16 | 5 | 46m12s | 14.801 |
| faster-whisper (`batch_size=16`) | fp16 | 5 | 25m50s | 13.527 |
*GPU Benchmarks are Executed with CUDA 12.4 on a NVIDIA RTX 3070 Ti 8GB.*
[^1]: transformers OOM for any batch size > 1
### Small model on CPU
| Implementation | Precision | Beam size | Time | RAM Usage |
| --- | --- | --- | --- | --- |
| openai/whisper | fp32 | 5 | 6m58s | 2335MB |
| whisper.cpp | fp32 | 5 | 2m05s | 1049MB |
| whisper.cpp (OpenVINO) | fp32 | 5 | 1m45s | 1642MB |
| faster-whisper | fp32 | 5 | 2m37s | 2257MB |
| faster-whisper (`batch_size=8`) | fp32 | 5 | 1m06s | 4230MB |
| faster-whisper | int8 | 5 | 1m42s | 1477MB |
| faster-whisper (`batch_size=8`) | int8 | 5 | 51s | 3608MB |
*Executed with 8 threads on an Intel Core i7-12700K.*
## Requirements
* Python 3.9 or greater
Unlike openai-whisper, FFmpeg does **not** need to be installed on the system. The audio is decoded with the Python library [PyAV](https://github.com/PyAV-Org/PyAV) which bundles the FFmpeg libraries in its package.
### GPU
GPU execution requires the following NVIDIA libraries to be installed:
* [cuBLAS for CUDA 12](https://developer.nvidia.com/cublas)
* [cuDNN 9 for CUDA 12](https://developer.nvidia.com/cudnn)
**Note**: The latest versions of `ctranslate2` only support CUDA 12 and cuDNN 9. For CUDA 11 and cuDNN 8, the current workaround is downgrading to the `3.24.0` version of `ctranslate2`, for CUDA 12 and cuDNN 8, downgrade to the `4.4.0` version of `ctranslate2`, (This can be done with `pip install --force-reinstall ctranslate2==4.4.0` or specifying the version in a `requirements.txt`).
There are multiple ways to install the NVIDIA libraries mentioned above. The recommended way is described in the official NVIDIA documentation, but we also suggest other installation methods below.
<details>
<summary>Other installation methods (click to expand)</summary>
**Note:** For all these methods below, keep in mind the above note regarding CUDA versions. Depending on your setup, you may need to install the _CUDA 11_ versions of libraries that correspond to the CUDA 12 libraries listed in the instructions below.
#### Use Docker
The libraries (cuBLAS, cuDNN) are installed in this official NVIDIA CUDA Docker images: `nvidia/cuda:12.3.2-cudnn9-runtime-ubuntu22.04`.
#### Install with `pip` (Linux only)
On Linux these libraries can be installed with `pip`. Note that `LD_LIBRARY_PATH` must be set before launching Python.
```bash
pip install nvidia-cublas-cu12 nvidia-cudnn-cu12==9.*
export LD_LIBRARY_PATH=`python3 -c 'import os; import nvidia.cublas.lib; import nvidia.cudnn.lib; print(os.path.dirname(nvidia.cublas.lib.__file__) + ":" + os.path.dirname(nvidia.cudnn.lib.__file__))'`
```
#### Download the libraries from Purfview's repository (Windows & Linux)
Purfview's [whisper-standalone-win](https://github.com/Purfview/whisper-standalone-win) provides the required NVIDIA libraries for Windows & Linux in a [single archive](https://github.com/Purfview/whisper-standalone-win/releases/tag/libs). Decompress the archive and place the libraries in a directory included in the `PATH`.
</details>
## Installation
The module can be installed from [PyPI](https://pypi.org/project/faster-whisper/):
```bash
pip install faster-whisper
```
<details>
<summary>Other installation methods (click to expand)</summary>
### Install the master branch
```bash
pip install --force-reinstall "faster-whisper @ https://github.com/SYSTRAN/faster-whisper/archive/refs/heads/master.tar.gz"
```
### Install a specific commit
```bash
pip install --force-reinstall "faster-whisper @ https://github.com/SYSTRAN/faster-whisper/archive/a4f1cc8f11433e454c3934442b5e1a4ed5e865c3.tar.gz"
```
</details>
## Usage
### Faster-whisper
```python
from faster_whisper import WhisperModel
model_size = "large-v3"
# Run on GPU with FP16
model = WhisperModel(model_size, device="cuda", compute_type="float16")
# or run on GPU with INT8
# model = WhisperModel(model_size, device="cuda", compute_type="int8_float16")
# or run on CPU with INT8
# model = WhisperModel(model_size, device="cpu", compute_type="int8")
segments, info = model.transcribe("audio.mp3", beam_size=5)
print("Detected language '%s' with probability %f" % (info.language, info.language_probability))
for segment in segments:
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
```
**Warning:** `segments` is a *generator* so the transcription only starts when you iterate over it. The transcription can be run to completion by gathering the segments in a list or a `for` loop:
```python
segments, _ = model.transcribe("audio.mp3")
segments = list(segments) # The transcription will actually run here.
```
### Batched Transcription
The following code snippet illustrates how to run batched transcription on an example audio file. `BatchedInferencePipeline.transcribe` is a drop-in replacement for `WhisperModel.transcribe`
```python
from faster_whisper import WhisperModel, BatchedInferencePipeline
model = WhisperModel("turbo", device="cuda", compute_type="float16")
batched_model = BatchedInferencePipeline(model=model)
segments, info = batched_model.transcribe("audio.mp3", batch_size=16)
for segment in segments:
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
```
### Faster Distil-Whisper
The Distil-Whisper checkpoints are compatible with the Faster-Whisper package. In particular, the latest [distil-large-v3](https://huggingface.co/distil-whisper/distil-large-v3)
checkpoint is intrinsically designed to work with the Faster-Whisper transcription algorithm. The following code snippet
demonstrates how to run inference with distil-large-v3 on a specified audio file:
```python
from faster_whisper import WhisperModel
model_size = "distil-large-v3"
model = WhisperModel(model_size, device="cuda", compute_type="float16")
segments, info = model.transcribe("audio.mp3", beam_size=5, language="en", condition_on_previous_text=False)
for segment in segments:
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
```
For more information about the distil-large-v3 model, refer to the original [model card](https://huggingface.co/distil-whisper/distil-large-v3).
### Word-level timestamps
```python
segments, _ = model.transcribe("audio.mp3", word_timestamps=True)
for segment in segments:
for word in segment.words:
print("[%.2fs -> %.2fs] %s" % (word.start, word.end, word.word))
```
### VAD filter
The library integrates the [Silero VAD](https://github.com/snakers4/silero-vad) model to filter out parts of the audio without speech:
```python
segments, _ = model.transcribe("audio.mp3", vad_filter=True)
```
The default behavior is conservative and only removes silence longer than 2 seconds. See the available VAD parameters and default values in the [source code](https://github.com/SYSTRAN/faster-whisper/blob/master/faster_whisper/vad.py). They can be customized with the dictionary argument `vad_parameters`:
```python
segments, _ = model.transcribe(
"audio.mp3",
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500),
)
```
Vad filter is enabled by default for batched transcription.
### Logging
The library logging level can be configured like this:
```python
import logging
logging.basicConfig()
logging.getLogger("faster_whisper").setLevel(logging.DEBUG)
```
### Going further
See more model and transcription options in the [`WhisperModel`](https://github.com/SYSTRAN/faster-whisper/blob/master/faster_whisper/transcribe.py) class implementation.
## Community integrations
Here is a non exhaustive list of open-source projects using faster-whisper. Feel free to add your project to the list!
* [faster-whisper-server](https://github.com/fedirz/faster-whisper-server) is an OpenAI compatible server using `faster-whisper`. It's easily deployable with Docker, works with OpenAI SDKs/CLI, supports streaming, and live transcription.
* [WhisperX](https://github.com/m-bain/whisperX) is an award-winning Python library that offers speaker diarization and accurate word-level timestamps using wav2vec2 alignment
* [whisper-ctranslate2](https://github.com/Softcatala/whisper-ctranslate2) is a command line client based on faster-whisper and compatible with the original client from openai/whisper.
* [whisper-diarize](https://github.com/MahmoudAshraf97/whisper-diarization) is a speaker diarization tool that is based on faster-whisper and NVIDIA NeMo.
* [whisper-standalone-win](https://github.com/Purfview/whisper-standalone-win) Standalone CLI executables of faster-whisper for Windows, Linux & macOS.
* [asr-sd-pipeline](https://github.com/hedrergudene/asr-sd-pipeline) provides a scalable, modular, end to end multi-speaker speech to text solution implemented using AzureML pipelines.
* [Open-Lyrics](https://github.com/zh-plus/Open-Lyrics) is a Python library that transcribes voice files using faster-whisper, and translates/polishes the resulting text into `.lrc` files in the desired language using OpenAI-GPT.
* [wscribe](https://github.com/geekodour/wscribe) is a flexible transcript generation tool supporting faster-whisper, it can export word level transcript and the exported transcript then can be edited with [wscribe-editor](https://github.com/geekodour/wscribe-editor)
* [aTrain](https://github.com/BANDAS-Center/aTrain) is a graphical user interface implementation of faster-whisper developed at the BANDAS-Center at the University of Graz for transcription and diarization in Windows ([Windows Store App](https://apps.microsoft.com/detail/atrain/9N15Q44SZNS2)) and Linux.
* [Whisper-Streaming](https://github.com/ufal/whisper_streaming) implements real-time mode for offline Whisper-like speech-to-text models with faster-whisper as the most recommended back-end. It implements a streaming policy with self-adaptive latency based on the actual source complexity, and demonstrates the state of the art.
* [WhisperLive](https://github.com/collabora/WhisperLive) is a nearly-live implementation of OpenAI's Whisper which uses faster-whisper as the backend to transcribe audio in real-time.
* [Faster-Whisper-Transcriber](https://github.com/BBC-Esq/ctranslate2-faster-whisper-transcriber) is a simple but reliable voice transcriber that provides a user-friendly interface.
* [Open-dubbing](https://github.com/softcatala/open-dubbing) is open dubbing is an AI dubbing system which uses machine learning models to automatically translate and synchronize audio dialogue into different languages.
## Model conversion
When loading a model from its size such as `WhisperModel("large-v3")`, the corresponding CTranslate2 model is automatically downloaded from the [Hugging Face Hub](https://huggingface.co/Systran).
We also provide a script to convert any Whisper models compatible with the Transformers library. They could be the original OpenAI models or user fine-tuned models.
For example the command below converts the [original "large-v3" Whisper model](https://huggingface.co/openai/whisper-large-v3) and saves the weights in FP16:
```bash
pip install transformers[torch]>=4.23
ct2-transformers-converter --model openai/whisper-large-v3 --output_dir whisper-large-v3-ct2
--copy_files tokenizer.json preprocessor_config.json --quantization float16
```
* The option `--model` accepts a model name on the Hub or a path to a model directory.
* If the option `--copy_files tokenizer.json` is not used, the tokenizer configuration is automatically downloaded when the model is loaded later.
Models can also be converted from the code. See the [conversion API](https://opennmt.net/CTranslate2/python/ctranslate2.converters.TransformersConverter.html).
### Load a converted model
1. Directly load the model from a local directory:
```python
model = faster_whisper.WhisperModel("whisper-large-v3-ct2")
```
2. [Upload your model to the Hugging Face Hub](https://huggingface.co/docs/transformers/model_sharing#upload-with-the-web-interface) and load it from its name:
```python
model = faster_whisper.WhisperModel("username/whisper-large-v3-ct2")
```
## Comparing performance against other implementations
If you are comparing the performance against other Whisper implementations, you should make sure to run the comparison with similar settings. In particular:
* Verify that the same transcription options are used, especially the same beam size. For example in openai/whisper, `model.transcribe` uses a default beam size of 1 but here we use a default beam size of 5.
* Transcription speed is closely affected by the number of words in the transcript, so ensure that other implementations have a similar WER (Word Error Rate) to this one.
* When running on CPU, make sure to set the same number of threads. Many frameworks will read the environment variable `OMP_NUM_THREADS`, which can be set when running your script:
```bash
OMP_NUM_THREADS=4 python3 my_script.py
```

BIN
benchmark/benchmark.m4a Normal file

Binary file not shown.

View File

@ -0,0 +1,80 @@
import argparse
import json
import os
from io import BytesIO
from datasets import load_dataset
from jiwer import wer
from pytubefix import YouTube
from pytubefix.exceptions import VideoUnavailable
from tqdm import tqdm
from transformers.models.whisper.english_normalizer import EnglishTextNormalizer
from faster_whisper import BatchedInferencePipeline, WhisperModel, decode_audio
def url_to_audio(row):
buffer = BytesIO()
yt = YouTube(row["link"])
try:
video = (
yt.streams.filter(only_audio=True, mime_type="audio/mp4")
.order_by("bitrate")
.desc()
.last()
)
video.stream_to_buffer(buffer)
buffer.seek(0)
row["audio"] = decode_audio(buffer)
except VideoUnavailable:
print(f'Failed to download: {row["link"]}')
row["audio"] = []
return row
parser = argparse.ArgumentParser(description="WER benchmark")
parser.add_argument(
"--audio_numb",
type=int,
default=None,
help="Specify the number of validation audio files in the dataset."
" Set to None to retrieve all audio files.",
)
args = parser.parse_args()
with open(os.path.join(os.path.dirname(__file__), "normalizer.json"), "r") as f:
normalizer = EnglishTextNormalizer(json.load(f))
dataset = load_dataset("mobiuslabsgmbh/youtube-commons-asr-eval", streaming=True).map(
url_to_audio
)
model = WhisperModel("large-v3", device="cuda")
pipeline = BatchedInferencePipeline(model, device="cuda")
all_transcriptions = []
all_references = []
# iterate over the dataset and run inference
for i, row in tqdm(enumerate(dataset["test"]), desc="Evaluating..."):
if not row["audio"]:
continue
result, info = pipeline.transcribe(
row["audio"][0],
batch_size=8,
word_timestamps=False,
without_timestamps=True,
)
all_transcriptions.append("".join(segment.text for segment in result))
all_references.append(row["text"][0])
if args.audio_numb and i == (args.audio_numb - 1):
break
# normalize predictions and references
all_transcriptions = [normalizer(transcription) for transcription in all_transcriptions]
all_references = [normalizer(reference) for reference in all_references]
# compute the WER metric
word_error_rate = 100 * wer(hypothesis=all_transcriptions, reference=all_references)
print("WER: %.3f" % word_error_rate)

View File

@ -0,0 +1,94 @@
import argparse
import time
from typing import Callable
import py3nvml.py3nvml as nvml
from memory_profiler import memory_usage
from utils import MyThread, get_logger, inference
logger = get_logger("faster-whisper")
parser = argparse.ArgumentParser(description="Memory benchmark")
parser.add_argument(
"--gpu_memory", action="store_true", help="Measure GPU memory usage"
)
parser.add_argument("--device-index", type=int, default=0, help="GPU device index")
parser.add_argument(
"--interval",
type=float,
default=0.5,
help="Interval at which measurements are collected",
)
args = parser.parse_args()
device_idx = args.device_index
interval = args.interval
def measure_memory(func: Callable[[], None]):
if args.gpu_memory:
logger.info(
"Measuring maximum GPU memory usage on GPU device."
" Make sure to not have additional processes running on the same GPU."
)
# init nvml
nvml.nvmlInit()
handle = nvml.nvmlDeviceGetHandleByIndex(device_idx)
gpu_name = nvml.nvmlDeviceGetName(handle)
gpu_memory_limit = nvml.nvmlDeviceGetMemoryInfo(handle).total >> 20
gpu_power_limit = nvml.nvmlDeviceGetPowerManagementLimit(handle) / 1000.0
info = {"gpu_memory_usage": [], "gpu_power_usage": []}
def _get_gpu_info():
while True:
info["gpu_memory_usage"].append(
nvml.nvmlDeviceGetMemoryInfo(handle).used >> 20
)
info["gpu_power_usage"].append(
nvml.nvmlDeviceGetPowerUsage(handle) / 1000
)
time.sleep(interval)
if stop:
break
return info
stop = False
thread = MyThread(_get_gpu_info, params=())
thread.start()
func()
stop = True
thread.join()
result = thread.get_result()
# shutdown nvml
nvml.nvmlShutdown()
max_memory_usage = max(result["gpu_memory_usage"])
max_power_usage = max(result["gpu_power_usage"])
print("GPU name: %s" % gpu_name)
print("GPU device index: %s" % device_idx)
print(
"Maximum GPU memory usage: %dMiB / %dMiB (%.2f%%)"
% (
max_memory_usage,
gpu_memory_limit,
(max_memory_usage / gpu_memory_limit) * 100,
)
)
print(
"Maximum GPU power usage: %dW / %dW (%.2f%%)"
% (
max_power_usage,
gpu_power_limit,
(max_power_usage / gpu_power_limit) * 100,
)
)
else:
logger.info("Measuring maximum increase of memory usage.")
max_usage = memory_usage(func, max_usage=True, interval=interval)
print("Maximum increase of RAM memory usage: %d MiB" % max_usage)
if __name__ == "__main__":
measure_memory(inference)

1742
benchmark/normalizer.json Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,6 @@
transformers
jiwer
datasets
memory_profiler
py3nvml
pytubefix

View File

@ -0,0 +1,31 @@
import argparse
import timeit
from typing import Callable
from utils import inference
parser = argparse.ArgumentParser(description="Speed benchmark")
parser.add_argument(
"--repeat",
type=int,
default=3,
help="Times an experiment will be run.",
)
args = parser.parse_args()
def measure_speed(func: Callable[[], None]):
# as written in https://docs.python.org/3/library/timeit.html#timeit.Timer.repeat,
# min should be taken rather than the average
runtimes = timeit.repeat(
func,
repeat=args.repeat,
number=10,
)
print(runtimes)
print("Min execution time: %.3fs" % (min(runtimes) / 10.0))
if __name__ == "__main__":
measure_speed(inference)

39
benchmark/utils.py Normal file
View File

@ -0,0 +1,39 @@
import logging
from threading import Thread
from typing import Optional
from faster_whisper import WhisperModel
model_path = "large-v3"
model = WhisperModel(model_path, device="cuda")
def inference():
segments, info = model.transcribe("benchmark.m4a", language="fr")
for segment in segments:
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
def get_logger(name: Optional[str] = None) -> logging.Logger:
formatter = logging.Formatter("%(levelname)s: %(message)s")
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
class MyThread(Thread):
def __init__(self, func, params):
super(MyThread, self).__init__()
self.func = func
self.params = params
self.result = None
def run(self):
self.result = self.func(*self.params)
def get_result(self):
return self.result

View File

@ -0,0 +1,59 @@
import argparse
import json
import os
from datasets import load_dataset
from jiwer import wer
from tqdm import tqdm
from transformers.models.whisper.english_normalizer import EnglishTextNormalizer
from faster_whisper import WhisperModel
parser = argparse.ArgumentParser(description="WER benchmark")
parser.add_argument(
"--audio_numb",
type=int,
default=None,
help="Specify the number of validation audio files in the dataset."
" Set to None to retrieve all audio files.",
)
args = parser.parse_args()
model_path = "large-v3"
model = WhisperModel(model_path, device="cuda")
# load the dataset with streaming mode
dataset = load_dataset("librispeech_asr", "clean", split="validation", streaming=True)
with open(os.path.join(os.path.dirname(__file__), "normalizer.json"), "r") as f:
normalizer = EnglishTextNormalizer(json.load(f))
def inference(batch):
batch["transcription"] = []
for sample in batch["audio"]:
segments, info = model.transcribe(sample["array"], language="en")
batch["transcription"].append("".join([segment.text for segment in segments]))
batch["reference"] = batch["text"]
return batch
dataset = dataset.map(function=inference, batched=True, batch_size=16)
all_transcriptions = []
all_references = []
# iterate over the dataset and run inference
for i, result in tqdm(enumerate(dataset), desc="Evaluating..."):
all_transcriptions.append(result["transcription"])
all_references.append(result["reference"])
if args.audio_numb and i == (args.audio_numb - 1):
break
# normalize predictions and references
all_transcriptions = [normalizer(transcription) for transcription in all_transcriptions]
all_references = [normalizer(reference) for reference in all_references]
# compute the WER metric
word_error_rate = 100 * wer(hypothesis=all_transcriptions, reference=all_references)
print("WER: %.3f" % word_error_rate)

6
docker/Dockerfile Normal file
View File

@ -0,0 +1,6 @@
FROM nvidia/cuda:12.3.2-cudnn9-runtime-ubuntu22.04
WORKDIR /root
RUN apt-get update -y && apt-get install -y python3-pip
COPY infer.py jfk.flac ./
RUN pip3 install faster-whisper
CMD ["python3", "infer.py"]

7
docker/infer.py Normal file
View File

@ -0,0 +1,7 @@
from faster_whisper import WhisperModel
jfk_path = "jfk.flac"
model = WhisperModel("tiny", device="cuda")
segments, info = model.transcribe(jfk_path, word_timestamps=True)
for segment in segments:
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))

BIN
docker/jfk.flac Normal file

Binary file not shown.

View File

@ -0,0 +1,14 @@
from faster_whisper.audio import decode_audio
from faster_whisper.transcribe import BatchedInferencePipeline, WhisperModel
from faster_whisper.utils import available_models, download_model, format_timestamp
from faster_whisper.version import __version__
__all__ = [
"available_models",
"decode_audio",
"WhisperModel",
"BatchedInferencePipeline",
"download_model",
"format_timestamp",
"__version__",
]

View File

Binary file not shown.

Binary file not shown.

123
faster_whisper/audio.py Normal file
View File

@ -0,0 +1,123 @@
"""We use the PyAV library to decode the audio: https://github.com/PyAV-Org/PyAV
The advantage of PyAV is that it bundles the FFmpeg libraries so there is no additional
system dependencies. FFmpeg does not need to be installed on the system.
However, the API is quite low-level so we need to manipulate audio frames directly.
"""
import gc
import io
import itertools
from typing import BinaryIO, Union
import av
import numpy as np
def decode_audio(
input_file: Union[str, BinaryIO],
sampling_rate: int = 16000,
split_stereo: bool = False,
):
"""Decodes the audio.
Args:
input_file: Path to the input file or a file-like object.
sampling_rate: Resample the audio to this sample rate.
split_stereo: Return separate left and right channels.
Returns:
A float32 Numpy array.
If `split_stereo` is enabled, the function returns a 2-tuple with the
separated left and right channels.
"""
resampler = av.audio.resampler.AudioResampler(
format="s16",
layout="mono" if not split_stereo else "stereo",
rate=sampling_rate,
)
raw_buffer = io.BytesIO()
dtype = None
with av.open(input_file, mode="r", metadata_errors="ignore") as container:
frames = container.decode(audio=0)
frames = _ignore_invalid_frames(frames)
frames = _group_frames(frames, 500000)
frames = _resample_frames(frames, resampler)
for frame in frames:
array = frame.to_ndarray()
dtype = array.dtype
raw_buffer.write(array)
# It appears that some objects related to the resampler are not freed
# unless the garbage collector is manually run.
# https://github.com/SYSTRAN/faster-whisper/issues/390
# note that this slows down loading the audio a little bit
# if that is a concern, please use ffmpeg directly as in here:
# https://github.com/openai/whisper/blob/25639fc/whisper/audio.py#L25-L62
del resampler
gc.collect()
audio = np.frombuffer(raw_buffer.getbuffer(), dtype=dtype)
# Convert s16 back to f32.
audio = audio.astype(np.float32) / 32768.0
if split_stereo:
left_channel = audio[0::2]
right_channel = audio[1::2]
return left_channel, right_channel
return audio
def _ignore_invalid_frames(frames):
iterator = iter(frames)
while True:
try:
yield next(iterator)
except StopIteration:
break
except av.error.InvalidDataError:
continue
def _group_frames(frames, num_samples=None):
fifo = av.audio.fifo.AudioFifo()
for frame in frames:
frame.pts = None # Ignore timestamp check.
fifo.write(frame)
if num_samples is not None and fifo.samples >= num_samples:
yield fifo.read()
if fifo.samples > 0:
yield fifo.read()
def _resample_frames(frames, resampler):
# Add None to flush the resampler.
for frame in itertools.chain(frames, [None]):
yield from resampler.resample(frame)
def pad_or_trim(array, length: int = 3000, *, axis: int = -1):
"""
Pad or trim the Mel features array to 3000, as expected by the encoder.
"""
if array.shape[axis] > length:
array = array.take(indices=range(length), axis=axis)
if array.shape[axis] < length:
pad_widths = [(0, 0)] * array.ndim
pad_widths[axis] = (0, length - array.shape[axis])
array = np.pad(array, pad_widths)
return array

View File

@ -0,0 +1,230 @@
import numpy as np
class FeatureExtractor:
def __init__(
self,
feature_size=80,
sampling_rate=16000,
hop_length=160,
chunk_length=30,
n_fft=400,
):
self.n_fft = n_fft
self.hop_length = hop_length
self.chunk_length = chunk_length
self.n_samples = chunk_length * sampling_rate
self.nb_max_frames = self.n_samples // hop_length
self.time_per_frame = hop_length / sampling_rate
self.sampling_rate = sampling_rate
self.mel_filters = self.get_mel_filters(
sampling_rate, n_fft, n_mels=feature_size
).astype("float32")
@staticmethod
def get_mel_filters(sr, n_fft, n_mels=128):
# Initialize the weights
n_mels = int(n_mels)
# Center freqs of each FFT bin
fftfreqs = np.fft.rfftfreq(n=n_fft, d=1.0 / sr)
# 'Center freqs' of mel bands - uniformly spaced between limits
min_mel = 0.0
max_mel = 45.245640471924965
mels = np.linspace(min_mel, max_mel, n_mels + 2)
# Fill in the linear scale
f_min = 0.0
f_sp = 200.0 / 3
freqs = f_min + f_sp * mels
# And now the nonlinear scale
min_log_hz = 1000.0 # beginning of log region (Hz)
min_log_mel = (min_log_hz - f_min) / f_sp # same (Mels)
logstep = np.log(6.4) / 27.0 # step size for log region
# If we have vector data, vectorize
log_t = mels >= min_log_mel
freqs[log_t] = min_log_hz * np.exp(logstep * (mels[log_t] - min_log_mel))
fdiff = np.diff(freqs)
ramps = freqs.reshape(-1, 1) - fftfreqs.reshape(1, -1)
lower = -ramps[:-2] / np.expand_dims(fdiff[:-1], axis=1)
upper = ramps[2:] / np.expand_dims(fdiff[1:], axis=1)
# Intersect them with each other and zero, vectorized across all i
weights = np.maximum(np.zeros_like(lower), np.minimum(lower, upper))
# Slaney-style mel is scaled to be approx constant energy per channel
enorm = 2.0 / (freqs[2 : n_mels + 2] - freqs[:n_mels])
weights *= np.expand_dims(enorm, axis=1)
return weights
@staticmethod
def stft(
input_array: np.ndarray,
n_fft: int,
hop_length: int = None,
win_length: int = None,
window: np.ndarray = None,
center: bool = True,
mode: str = "reflect",
normalized: bool = False,
onesided: bool = None,
return_complex: bool = None,
):
# Default initialization for hop_length and win_length
hop_length = hop_length if hop_length is not None else n_fft // 4
win_length = win_length if win_length is not None else n_fft
input_is_complex = np.iscomplexobj(input_array)
# Determine if the output should be complex
return_complex = (
return_complex
if return_complex is not None
else (input_is_complex or (window is not None and np.iscomplexobj(window)))
)
if not return_complex and return_complex is None:
raise ValueError(
"stft requires the return_complex parameter for real inputs."
)
# Input checks
if not np.issubdtype(input_array.dtype, np.floating) and not input_is_complex:
raise ValueError(
"stft: expected an array of floating point or complex values,"
f" got {input_array.dtype}"
)
if input_array.ndim > 2 or input_array.ndim < 1:
raise ValueError(
f"stft: expected a 1D or 2D array, but got {input_array.ndim}D array"
)
# Handle 1D input
if input_array.ndim == 1:
input_array = np.expand_dims(input_array, axis=0)
input_array_1d = True
else:
input_array_1d = False
# Center padding if required
if center:
pad_amount = n_fft // 2
input_array = np.pad(
input_array, ((0, 0), (pad_amount, pad_amount)), mode=mode
)
batch, length = input_array.shape
# Additional input checks
if n_fft <= 0 or n_fft > length:
raise ValueError(
f"stft: expected 0 < n_fft <= {length}, but got n_fft={n_fft}"
)
if hop_length <= 0:
raise ValueError(
f"stft: expected hop_length > 0, but got hop_length={hop_length}"
)
if win_length <= 0 or win_length > n_fft:
raise ValueError(
f"stft: expected 0 < win_length <= n_fft, but got win_length={win_length}"
)
if window is not None:
if window.ndim != 1 or window.shape[0] != win_length:
raise ValueError(
f"stft: expected a 1D window array of size equal to win_length={win_length}, "
f"but got window with size {window.shape}"
)
# Handle padding of the window if necessary
if win_length < n_fft:
left = (n_fft - win_length) // 2
window_ = np.zeros(n_fft, dtype=window.dtype)
window_[left : left + win_length] = window
else:
window_ = window
# Calculate the number of frames
n_frames = 1 + (length - n_fft) // hop_length
# Time to columns
input_array = np.lib.stride_tricks.as_strided(
input_array,
(batch, n_frames, n_fft),
(
input_array.strides[0],
hop_length * input_array.strides[1],
input_array.strides[1],
),
)
if window_ is not None:
input_array = input_array * window_
# FFT and transpose
complex_fft = input_is_complex
onesided = onesided if onesided is not None else not complex_fft
if normalized:
norm = "ortho"
else:
norm = None
if complex_fft:
if onesided:
raise ValueError(
"Cannot have onesided output if window or input is complex"
)
output = np.fft.fft(input_array, n=n_fft, axis=-1, norm=norm)
else:
output = np.fft.rfft(input_array, n=n_fft, axis=-1, norm=norm)
output = output.transpose((0, 2, 1))
if input_array_1d:
output = output.squeeze(0)
return output if return_complex else np.real(output)
def __call__(self, waveform: np.ndarray, padding=160, chunk_length=None):
"""
Compute the log-Mel spectrogram of the provided audio.
"""
if chunk_length is not None:
self.n_samples = chunk_length * self.sampling_rate
self.nb_max_frames = self.n_samples // self.hop_length
if waveform.dtype is not np.float32:
waveform = waveform.astype(np.float32)
if padding:
waveform = np.pad(waveform, (0, padding))
window = np.hanning(self.n_fft + 1)[:-1].astype("float32")
stft = self.stft(
waveform,
self.n_fft,
self.hop_length,
window=window,
return_complex=True,
).astype("complex64")
magnitudes = np.abs(stft[..., :-1]) ** 2
mel_spec = self.mel_filters @ magnitudes
log_spec = np.log10(np.clip(mel_spec, a_min=1e-10, a_max=None))
log_spec = np.maximum(log_spec, log_spec.max() - 8.0)
log_spec = (log_spec + 4.0) / 4.0
return log_spec

314
faster_whisper/tokenizer.py Normal file
View File

@ -0,0 +1,314 @@
import string
from functools import cached_property
from typing import List, Optional, Tuple
import tokenizers
class Tokenizer:
"""Simple wrapper around a tokenizers.Tokenizer."""
def __init__(
self,
tokenizer: tokenizers.Tokenizer,
multilingual: bool,
task: Optional[str] = None,
language: Optional[str] = None,
):
self.tokenizer = tokenizer
if multilingual:
if task not in _TASKS:
raise ValueError(
"'%s' is not a valid task (accepted tasks: %s)"
% (task, ", ".join(_TASKS))
)
if language not in _LANGUAGE_CODES:
raise ValueError(
"'%s' is not a valid language code (accepted language codes: %s)"
% (language, ", ".join(_LANGUAGE_CODES))
)
self.task = self.tokenizer.token_to_id("<|%s|>" % task)
self.language = self.tokenizer.token_to_id("<|%s|>" % language)
self.language_code = language
else:
self.task = None
self.language = None
self.language_code = "en"
@cached_property
def transcribe(self) -> int:
return self.tokenizer.token_to_id("<|transcribe|>")
@cached_property
def translate(self) -> int:
return self.tokenizer.token_to_id("<|translate|>")
@cached_property
def sot(self) -> int:
return self.tokenizer.token_to_id("<|startoftranscript|>")
@cached_property
def sot_lm(self) -> int:
return self.tokenizer.token_to_id("<|startoflm|>")
@cached_property
def sot_prev(self) -> int:
return self.tokenizer.token_to_id("<|startofprev|>")
@cached_property
def eot(self) -> int:
return self.tokenizer.token_to_id("<|endoftext|>")
@cached_property
def no_timestamps(self) -> int:
return self.tokenizer.token_to_id("<|notimestamps|>")
@property
def timestamp_begin(self) -> int:
return self.no_timestamps + 1
@property
def sot_sequence(self) -> List[int]:
sequence = [self.sot]
if self.language is not None:
sequence.append(self.language)
if self.task is not None:
sequence.append(self.task)
return sequence
def encode(self, text: str) -> List[int]:
return self.tokenizer.encode(text, add_special_tokens=False).ids
def decode(self, tokens: List[int]) -> str:
text_tokens = [token for token in tokens if token < self.eot]
return self.tokenizer.decode(text_tokens)
def decode_with_timestamps(self, tokens: List[int]) -> str:
outputs = [[]]
for token in tokens:
if token >= self.timestamp_begin:
timestamp = f"<|{(token - self.timestamp_begin) * 0.02:.2f}|>"
outputs.append(timestamp)
outputs.append([])
else:
outputs[-1].append(token)
return "".join(
[s if isinstance(s, str) else self.tokenizer.decode(s) for s in outputs]
)
@cached_property
def non_speech_tokens(self) -> Tuple[int]:
"""
Returns the list of tokens to suppress in order to avoid any speaker tags or non-speech
annotations, to prevent sampling texts that are not actually spoken in the audio, e.g.
-
- ( SPEAKING FOREIGN LANGUAGE )
- [DAVID] Hey there,
keeping basic punctuations like commas, periods, question marks, exclamation points, etc.
"""
symbols = list('"#()*+/:;<=>@[\\]^_`{|}~「」『』')
symbols += (
"<< >> <<< >>> -- --- -( -[ (' (\" (( )) ((( ))) [[ ]] {{ }} ♪♪ ♪♪♪".split()
)
# symbols that may be a single token or multiple tokens depending on the tokenizer.
# In case they're multiple tokens, suppress the first token, which is safe because:
# These are between U+2640 and U+267F miscellaneous symbols that are okay to suppress
# in generations, and in the 3-byte UTF-8 representation they share the first two bytes.
miscellaneous = set("♩♪♫♬♭♮♯")
assert all(0x2640 <= ord(c) <= 0x267F for c in miscellaneous)
# allow hyphens "-" and single quotes "'" between words, but not at the beginning of a word
result = {self.encode(" -")[0], self.encode(" '")[0]}
for symbol in symbols + list(miscellaneous):
for tokens in [
self.encode(symbol),
self.encode(" " + symbol),
]:
if len(tokens) == 1 or symbol in miscellaneous:
result.add(tokens[0])
return tuple(sorted(result))
def split_to_word_tokens(
self, tokens: List[int]
) -> Tuple[List[str], List[List[int]]]:
if self.language_code in {"zh", "ja", "th", "lo", "my", "yue"}:
# These languages don't typically use spaces, so it is difficult to split words
# without morpheme analysis. Here, we instead split words at any
# position where the tokens are decoded as valid unicode points
return self.split_tokens_on_unicode(tokens)
return self.split_tokens_on_spaces(tokens)
def split_tokens_on_unicode(
self, tokens: List[int]
) -> Tuple[List[str], List[List[int]]]:
decoded_full = self.decode_with_timestamps(tokens)
replacement_char = "\ufffd"
words = []
word_tokens = []
current_tokens = []
unicode_offset = 0
for token in tokens:
current_tokens.append(token)
decoded = self.decode_with_timestamps(current_tokens)
try:
replacement_char_index = decoded.index(replacement_char)
replacement_char_index += unicode_offset
except ValueError:
replacement_char_index = None
if replacement_char_index is None or (
replacement_char_index < len(decoded_full)
and decoded_full[replacement_char_index] == replacement_char
):
words.append(decoded)
word_tokens.append(current_tokens)
current_tokens = []
unicode_offset += len(decoded)
return words, word_tokens
def split_tokens_on_spaces(
self, tokens: List[int]
) -> Tuple[List[str], List[List[int]]]:
subwords, subword_tokens_list = self.split_tokens_on_unicode(tokens)
words = []
word_tokens = []
for subword, subword_tokens in zip(subwords, subword_tokens_list):
special = subword_tokens[0] >= self.eot
with_space = subword.startswith(" ")
punctuation = subword.strip() in string.punctuation
if special or with_space or punctuation or len(words) == 0:
words.append(subword)
word_tokens.append(subword_tokens)
else:
words[-1] = words[-1] + subword
word_tokens[-1].extend(subword_tokens)
return words, word_tokens
_TASKS = (
"transcribe",
"translate",
)
_LANGUAGE_CODES = (
"af",
"am",
"ar",
"as",
"az",
"ba",
"be",
"bg",
"bn",
"bo",
"br",
"bs",
"ca",
"cs",
"cy",
"da",
"de",
"el",
"en",
"es",
"et",
"eu",
"fa",
"fi",
"fo",
"fr",
"gl",
"gu",
"ha",
"haw",
"he",
"hi",
"hr",
"ht",
"hu",
"hy",
"id",
"is",
"it",
"ja",
"jw",
"ka",
"kk",
"km",
"kn",
"ko",
"la",
"lb",
"ln",
"lo",
"lt",
"lv",
"mg",
"mi",
"mk",
"ml",
"mn",
"mr",
"ms",
"mt",
"my",
"ne",
"nl",
"nn",
"no",
"oc",
"pa",
"pl",
"ps",
"pt",
"ro",
"ru",
"sa",
"sd",
"si",
"sk",
"sl",
"sn",
"so",
"sq",
"sr",
"su",
"sv",
"sw",
"ta",
"te",
"tg",
"th",
"tk",
"tl",
"tr",
"tt",
"uk",
"ur",
"uz",
"vi",
"yi",
"yo",
"zh",
"yue",
)

1898
faster_whisper/transcribe.py Normal file

File diff suppressed because it is too large Load Diff

159
faster_whisper/utils.py Normal file
View File

@ -0,0 +1,159 @@
import logging
import os
import re
from typing import List, Optional
import huggingface_hub
import requests
from tqdm.auto import tqdm
_MODELS = {
"tiny.en": "Systran/faster-whisper-tiny.en",
"tiny": "Systran/faster-whisper-tiny",
"base.en": "Systran/faster-whisper-base.en",
"base": "Systran/faster-whisper-base",
"small.en": "Systran/faster-whisper-small.en",
"small": "Systran/faster-whisper-small",
"medium.en": "Systran/faster-whisper-medium.en",
"medium": "Systran/faster-whisper-medium",
"large-v1": "Systran/faster-whisper-large-v1",
"large-v2": "Systran/faster-whisper-large-v2",
"large-v3": "Systran/faster-whisper-large-v3",
"large": "Systran/faster-whisper-large-v3",
"distil-large-v2": "Systran/faster-distil-whisper-large-v2",
"distil-medium.en": "Systran/faster-distil-whisper-medium.en",
"distil-small.en": "Systran/faster-distil-whisper-small.en",
"distil-large-v3": "Systran/faster-distil-whisper-large-v3",
"large-v3-turbo": "mobiuslabsgmbh/faster-whisper-large-v3-turbo",
"turbo": "mobiuslabsgmbh/faster-whisper-large-v3-turbo",
}
def available_models() -> List[str]:
"""Returns the names of available models."""
return list(_MODELS.keys())
def get_assets_path():
"""Returns the path to the assets directory."""
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets")
def get_logger():
"""Returns the module logger."""
return logging.getLogger("faster_whisper")
def download_model(
size_or_id: str,
output_dir: Optional[str] = None,
local_files_only: bool = False,
cache_dir: Optional[str] = None,
):
"""Downloads a CTranslate2 Whisper model from the Hugging Face Hub.
Args:
size_or_id: Size of the model to download from https://huggingface.co/Systran
(tiny, tiny.en, base, base.en, small, small.en, distil-small.en, medium, medium.en,
distil-medium.en, large-v1, large-v2, large-v3, large, distil-large-v2,
distil-large-v3), or a CTranslate2-converted model ID from the Hugging Face Hub
(e.g. Systran/faster-whisper-large-v3).
output_dir: Directory where the model should be saved. If not set, the model is saved in
the cache directory.
local_files_only: If True, avoid downloading the file and return the path to the local
cached file if it exists.
cache_dir: Path to the folder where cached files are stored.
Returns:
The path to the downloaded model.
Raises:
ValueError: if the model size is invalid.
"""
if re.match(r".*/.*", size_or_id):
repo_id = size_or_id
else:
repo_id = _MODELS.get(size_or_id)
if repo_id is None:
raise ValueError(
"Invalid model size '%s', expected one of: %s"
% (size_or_id, ", ".join(_MODELS.keys()))
)
allow_patterns = [
"config.json",
"preprocessor_config.json",
"model.bin",
"tokenizer.json",
"vocabulary.*",
]
kwargs = {
"local_files_only": local_files_only,
"allow_patterns": allow_patterns,
"tqdm_class": disabled_tqdm,
}
if output_dir is not None:
kwargs["local_dir"] = output_dir
kwargs["local_dir_use_symlinks"] = False
if cache_dir is not None:
kwargs["cache_dir"] = cache_dir
try:
return huggingface_hub.snapshot_download(repo_id, **kwargs)
except (
huggingface_hub.utils.HfHubHTTPError,
requests.exceptions.ConnectionError,
) as exception:
logger = get_logger()
logger.warning(
"An error occured while synchronizing the model %s from the Hugging Face Hub:\n%s",
repo_id,
exception,
)
logger.warning(
"Trying to load the model directly from the local cache, if it exists."
)
kwargs["local_files_only"] = True
return huggingface_hub.snapshot_download(repo_id, **kwargs)
def format_timestamp(
seconds: float,
always_include_hours: bool = False,
decimal_marker: str = ".",
) -> str:
assert seconds >= 0, "non-negative timestamp expected"
milliseconds = round(seconds * 1000.0)
hours = milliseconds // 3_600_000
milliseconds -= hours * 3_600_000
minutes = milliseconds // 60_000
milliseconds -= minutes * 60_000
seconds = milliseconds // 1_000
milliseconds -= seconds * 1_000
hours_marker = f"{hours:02d}:" if always_include_hours or hours > 0 else ""
return (
f"{hours_marker}{minutes:02d}:{seconds:02d}{decimal_marker}{milliseconds:03d}"
)
class disabled_tqdm(tqdm):
def __init__(self, *args, **kwargs):
kwargs["disable"] = True
super().__init__(*args, **kwargs)
def get_end(segments: List[dict]) -> Optional[float]:
return next(
(w["end"] for s in reversed(segments) for w in reversed(s["words"])),
segments[-1]["end"] if segments else None,
)

372
faster_whisper/vad.py Normal file
View File

@ -0,0 +1,372 @@
import bisect
import functools
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import numpy as np
from faster_whisper.utils import get_assets_path
# The code below is adapted from https://github.com/snakers4/silero-vad.
@dataclass
class VadOptions:
"""VAD options.
Attributes:
threshold: Speech threshold. Silero VAD outputs speech probabilities for each audio chunk,
probabilities ABOVE this value are considered as SPEECH. It is better to tune this
parameter for each dataset separately, but "lazy" 0.5 is pretty good for most datasets.
neg_threshold: Silence threshold for determining the end of speech. If a probability is lower
than neg_threshold, it is always considered silence. Values higher than neg_threshold
are only considered speech if the previous sample was classified as speech; otherwise,
they are treated as silence. This parameter helps refine the detection of speech
transitions, ensuring smoother segment boundaries.
min_speech_duration_ms: Final speech chunks shorter min_speech_duration_ms are thrown out.
max_speech_duration_s: Maximum duration of speech chunks in seconds. Chunks longer
than max_speech_duration_s will be split at the timestamp of the last silence that
lasts more than 100ms (if any), to prevent aggressive cutting. Otherwise, they will be
split aggressively just before max_speech_duration_s.
min_silence_duration_ms: In the end of each speech chunk wait for min_silence_duration_ms
before separating it
speech_pad_ms: Final speech chunks are padded by speech_pad_ms each side
"""
threshold: float = 0.5
neg_threshold: float = None
min_speech_duration_ms: int = 0
max_speech_duration_s: float = float("inf")
min_silence_duration_ms: int = 2000
speech_pad_ms: int = 400
def get_speech_timestamps(
audio: np.ndarray,
vad_options: Optional[VadOptions] = None,
sampling_rate: int = 16000,
**kwargs,
) -> List[dict]:
"""This method is used for splitting long audios into speech chunks using silero VAD.
Args:
audio: One dimensional float array.
vad_options: Options for VAD processing.
sampling rate: Sampling rate of the audio.
kwargs: VAD options passed as keyword arguments for backward compatibility.
Returns:
List of dicts containing begin and end samples of each speech chunk.
"""
if vad_options is None:
vad_options = VadOptions(**kwargs)
threshold = vad_options.threshold
neg_threshold = vad_options.neg_threshold
min_speech_duration_ms = vad_options.min_speech_duration_ms
max_speech_duration_s = vad_options.max_speech_duration_s
min_silence_duration_ms = vad_options.min_silence_duration_ms
window_size_samples = 512
speech_pad_ms = vad_options.speech_pad_ms
min_speech_samples = sampling_rate * min_speech_duration_ms / 1000
speech_pad_samples = sampling_rate * speech_pad_ms / 1000
max_speech_samples = (
sampling_rate * max_speech_duration_s
- window_size_samples
- 2 * speech_pad_samples
)
min_silence_samples = sampling_rate * min_silence_duration_ms / 1000
min_silence_samples_at_max_speech = sampling_rate * 98 / 1000
audio_length_samples = len(audio)
model = get_vad_model()
padded_audio = np.pad(
audio, (0, window_size_samples - audio.shape[0] % window_size_samples)
)
speech_probs = model(padded_audio.reshape(1, -1)).squeeze(0)
triggered = False
speeches = []
current_speech = {}
if neg_threshold is None:
neg_threshold = max(threshold - 0.15, 0.01)
# to save potential segment end (and tolerate some silence)
temp_end = 0
# to save potential segment limits in case of maximum segment size reached
prev_end = next_start = 0
for i, speech_prob in enumerate(speech_probs):
if (speech_prob >= threshold) and temp_end:
temp_end = 0
if next_start < prev_end:
next_start = window_size_samples * i
if (speech_prob >= threshold) and not triggered:
triggered = True
current_speech["start"] = window_size_samples * i
continue
if (
triggered
and (window_size_samples * i) - current_speech["start"] > max_speech_samples
):
if prev_end:
current_speech["end"] = prev_end
speeches.append(current_speech)
current_speech = {}
# previously reached silence (< neg_thres) and is still not speech (< thres)
if next_start < prev_end:
triggered = False
else:
current_speech["start"] = next_start
prev_end = next_start = temp_end = 0
else:
current_speech["end"] = window_size_samples * i
speeches.append(current_speech)
current_speech = {}
prev_end = next_start = temp_end = 0
triggered = False
continue
if (speech_prob < neg_threshold) and triggered:
if not temp_end:
temp_end = window_size_samples * i
# condition to avoid cutting in very short silence
if (window_size_samples * i) - temp_end > min_silence_samples_at_max_speech:
prev_end = temp_end
if (window_size_samples * i) - temp_end < min_silence_samples:
continue
else:
current_speech["end"] = temp_end
if (
current_speech["end"] - current_speech["start"]
) > min_speech_samples:
speeches.append(current_speech)
current_speech = {}
prev_end = next_start = temp_end = 0
triggered = False
continue
if (
current_speech
and (audio_length_samples - current_speech["start"]) > min_speech_samples
):
current_speech["end"] = audio_length_samples
speeches.append(current_speech)
for i, speech in enumerate(speeches):
if i == 0:
speech["start"] = int(max(0, speech["start"] - speech_pad_samples))
if i != len(speeches) - 1:
silence_duration = speeches[i + 1]["start"] - speech["end"]
if silence_duration < 2 * speech_pad_samples:
speech["end"] += int(silence_duration // 2)
speeches[i + 1]["start"] = int(
max(0, speeches[i + 1]["start"] - silence_duration // 2)
)
else:
speech["end"] = int(
min(audio_length_samples, speech["end"] + speech_pad_samples)
)
speeches[i + 1]["start"] = int(
max(0, speeches[i + 1]["start"] - speech_pad_samples)
)
else:
speech["end"] = int(
min(audio_length_samples, speech["end"] + speech_pad_samples)
)
return speeches
def collect_chunks(
audio: np.ndarray, chunks: List[dict], sampling_rate: int = 16000
) -> Tuple[List[np.ndarray], List[Dict[str, int]]]:
"""Collects audio chunks."""
if not chunks:
chunk_metadata = {
"start_time": 0,
"end_time": 0,
}
return [np.array([], dtype=np.float32)], [chunk_metadata]
audio_chunks = []
chunks_metadata = []
for chunk in chunks:
chunk_metadata = {
"start_time": chunk["start"] / sampling_rate,
"end_time": chunk["end"] / sampling_rate,
}
audio_chunks.append(audio[chunk["start"] : chunk["end"]])
chunks_metadata.append(chunk_metadata)
return audio_chunks, chunks_metadata
class SpeechTimestampsMap:
"""Helper class to restore original speech timestamps."""
def __init__(self, chunks: List[dict], sampling_rate: int, time_precision: int = 2):
self.sampling_rate = sampling_rate
self.time_precision = time_precision
self.chunk_end_sample = []
self.total_silence_before = []
previous_end = 0
silent_samples = 0
for chunk in chunks:
silent_samples += chunk["start"] - previous_end
previous_end = chunk["end"]
self.chunk_end_sample.append(chunk["end"] - silent_samples)
self.total_silence_before.append(silent_samples / sampling_rate)
def get_original_time(
self,
time: float,
chunk_index: Optional[int] = None,
) -> float:
if chunk_index is None:
chunk_index = self.get_chunk_index(time)
total_silence_before = self.total_silence_before[chunk_index]
return round(total_silence_before + time, self.time_precision)
def get_chunk_index(self, time: float) -> int:
sample = int(time * self.sampling_rate)
return min(
bisect.bisect(self.chunk_end_sample, sample),
len(self.chunk_end_sample) - 1,
)
@functools.lru_cache
def get_vad_model():
"""Returns the VAD model instance."""
encoder_path = os.path.join(get_assets_path(), "silero_encoder_v5.onnx")
decoder_path = os.path.join(get_assets_path(), "silero_decoder_v5.onnx")
return SileroVADModel(encoder_path, decoder_path)
class SileroVADModel:
def __init__(self, encoder_path, decoder_path):
try:
import onnxruntime
except ImportError as e:
raise RuntimeError(
"Applying the VAD filter requires the onnxruntime package"
) from e
opts = onnxruntime.SessionOptions()
opts.inter_op_num_threads = 1
opts.intra_op_num_threads = 1
opts.enable_cpu_mem_arena = False
opts.log_severity_level = 4
self.encoder_session = onnxruntime.InferenceSession(
encoder_path,
providers=["CPUExecutionProvider"],
sess_options=opts,
)
self.decoder_session = onnxruntime.InferenceSession(
decoder_path,
providers=["CPUExecutionProvider"],
sess_options=opts,
)
def __call__(
self, audio: np.ndarray, num_samples: int = 512, context_size_samples: int = 64
):
assert (
audio.ndim == 2
), "Input should be a 2D array with size (batch_size, num_samples)"
assert (
audio.shape[1] % num_samples == 0
), "Input size should be a multiple of num_samples"
batch_size = audio.shape[0]
state = np.zeros((2, batch_size, 128), dtype="float32")
context = np.zeros(
(batch_size, context_size_samples),
dtype="float32",
)
batched_audio = audio.reshape(batch_size, -1, num_samples)
context = batched_audio[..., -context_size_samples:]
context[:, -1] = 0
context = np.roll(context, 1, 1)
batched_audio = np.concatenate([context, batched_audio], 2)
batched_audio = batched_audio.reshape(-1, num_samples + context_size_samples)
encoder_batch_size = 10000
num_segments = batched_audio.shape[0]
encoder_outputs = []
for i in range(0, num_segments, encoder_batch_size):
encoder_output = self.encoder_session.run(
None, {"input": batched_audio[i : i + encoder_batch_size]}
)[0]
encoder_outputs.append(encoder_output)
encoder_output = np.concatenate(encoder_outputs, axis=0)
encoder_output = encoder_output.reshape(batch_size, -1, 128)
decoder_outputs = []
for window in np.split(encoder_output, encoder_output.shape[1], axis=1):
out, state = self.decoder_session.run(
None, {"input": window.squeeze(1), "state": state}
)
decoder_outputs.append(out)
out = np.stack(decoder_outputs, axis=1).squeeze(-1)
return out
def merge_segments(segments_list, vad_options: VadOptions, sampling_rate: int = 16000):
if not segments_list:
return []
curr_end = 0
seg_idxs = []
merged_segments = []
edge_padding = vad_options.speech_pad_ms * sampling_rate // 1000
chunk_length = vad_options.max_speech_duration_s * sampling_rate
curr_start = segments_list[0]["start"]
for idx, seg in enumerate(segments_list):
# if any segment start timing is less than previous segment end timing,
# reset the edge padding. Similarly for end timing.
if idx > 0:
if seg["start"] < segments_list[idx - 1]["end"]:
seg["start"] += edge_padding
if idx < len(segments_list) - 1:
if seg["end"] > segments_list[idx + 1]["start"]:
seg["end"] -= edge_padding
if seg["end"] - curr_start > chunk_length and curr_end - curr_start > 0:
merged_segments.append(
{
"start": curr_start,
"end": curr_end,
"segments": seg_idxs,
}
)
curr_start = seg["start"]
seg_idxs = []
curr_end = seg["end"]
seg_idxs.append((seg["start"], seg["end"]))
# add final
merged_segments.append(
{
"start": curr_start,
"end": curr_end,
"segments": seg_idxs,
}
)
return merged_segments

View File

@ -0,0 +1,3 @@
"""Version information."""
__version__ = "1.1.1"

View File

@ -0,0 +1 @@
transformers[torch]>=4.23

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
ctranslate2>=4.0,<5
huggingface_hub>=0.13
tokenizers>=0.13,<1
onnxruntime>=1.14,<2
av>=11
tqdm

9
setup.cfg Normal file
View File

@ -0,0 +1,9 @@
[flake8]
max-line-length = 100
ignore =
E203,
W503,
[isort]
profile=black
lines_between_types=1

67
setup.py Normal file
View File

@ -0,0 +1,67 @@
import os
from setuptools import find_packages, setup
base_dir = os.path.dirname(os.path.abspath(__file__))
def get_long_description():
readme_path = os.path.join(base_dir, "README.md")
with open(readme_path, encoding="utf-8") as readme_file:
return readme_file.read()
def get_project_version():
version_path = os.path.join(base_dir, "faster_whisper", "version.py")
version = {}
with open(version_path, encoding="utf-8") as fp:
exec(fp.read(), version)
return version["__version__"]
def get_requirements(path):
with open(path, encoding="utf-8") as requirements:
return [requirement.strip() for requirement in requirements]
install_requires = get_requirements(os.path.join(base_dir, "requirements.txt"))
conversion_requires = get_requirements(
os.path.join(base_dir, "requirements.conversion.txt")
)
setup(
name="faster-whisper",
version=get_project_version(),
license="MIT",
description="Faster Whisper transcription with CTranslate2",
long_description=get_long_description(),
long_description_content_type="text/markdown",
author="Guillaume Klein",
url="https://github.com/SYSTRAN/faster-whisper",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
],
keywords="openai whisper speech ctranslate2 inference quantization transformer",
python_requires=">=3.9",
install_requires=install_requires,
extras_require={
"conversion": conversion_requires,
"dev": [
"black==23.*",
"flake8==6.*",
"isort==5.*",
"pytest==7.*",
],
},
packages=find_packages(),
include_package_data=True,
)

18
tests/conftest.py Normal file
View File

@ -0,0 +1,18 @@
import os
import pytest
@pytest.fixture
def data_dir():
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
@pytest.fixture
def jfk_path(data_dir):
return os.path.join(data_dir, "jfk.flac")
@pytest.fixture
def physcisworks_path(data_dir):
return os.path.join(data_dir, "physicsworks.wav")

BIN
tests/data/hotwords.mp3 Normal file

Binary file not shown.

BIN
tests/data/jfk.flac Normal file

Binary file not shown.

BIN
tests/data/multilingual.mp3 Normal file

Binary file not shown.

BIN
tests/data/physicsworks.wav Normal file

Binary file not shown.

Binary file not shown.

120
tests/test_tokenizer.py Normal file
View File

@ -0,0 +1,120 @@
from faster_whisper import WhisperModel
from faster_whisper.tokenizer import Tokenizer
from faster_whisper.transcribe import get_suppressed_tokens
def test_suppressed_tokens_minus_1():
model = WhisperModel("tiny.en")
tokenizer = Tokenizer(model.hf_tokenizer, False)
tokens = get_suppressed_tokens(tokenizer, [-1])
assert tokens == (
1,
2,
7,
8,
9,
10,
14,
25,
26,
27,
28,
29,
31,
58,
59,
60,
61,
62,
63,
90,
91,
92,
93,
357,
366,
438,
532,
685,
705,
796,
930,
1058,
1220,
1267,
1279,
1303,
1343,
1377,
1391,
1635,
1782,
1875,
2162,
2361,
2488,
3467,
4008,
4211,
4600,
4808,
5299,
5855,
6329,
7203,
9609,
9959,
10563,
10786,
11420,
11709,
11907,
13163,
13697,
13700,
14808,
15306,
16410,
16791,
17992,
19203,
19510,
20724,
22305,
22935,
27007,
30109,
30420,
33409,
34949,
40283,
40493,
40549,
47282,
49146,
50257,
50357,
50358,
50359,
50360,
)
def test_suppressed_tokens_minus_value():
model = WhisperModel("tiny.en")
tokenizer = Tokenizer(model.hf_tokenizer, False)
tokens = get_suppressed_tokens(tokenizer, [13])
assert tokens == (13, 50257, 50357, 50358, 50359, 50360)
def test_split_on_unicode():
model = WhisperModel("tiny")
tokenizer = Tokenizer(model.hf_tokenizer, False)
tokens = [8404, 871, 287, 6, 246, 526, 3210, 20378]
words, word_tokens = tokenizer.split_tokens_on_unicode(tokens)
assert words == [" elle", " est", " l", "'", "\ufffd", "é", "rit", "oire"]
assert word_tokens == [[8404], [871], [287], [6], [246], [526], [3210], [20378]]

271
tests/test_transcribe.py Normal file
View File

@ -0,0 +1,271 @@
import inspect
import os
import numpy as np
from faster_whisper import BatchedInferencePipeline, WhisperModel, decode_audio
def test_supported_languages():
model = WhisperModel("tiny.en")
assert model.supported_languages == ["en"]
def test_transcribe(jfk_path):
model = WhisperModel("tiny")
segments, info = model.transcribe(jfk_path, word_timestamps=True)
assert info.all_language_probs is not None
assert info.language == "en"
assert info.language_probability > 0.9
assert info.duration == 11
# Get top language info from all results, which should match the
# already existing metadata
top_lang, top_lang_score = info.all_language_probs[0]
assert info.language == top_lang
assert abs(info.language_probability - top_lang_score) < 1e-16
segments = list(segments)
assert len(segments) == 1
segment = segments[0]
assert segment.text == (
" And so my fellow Americans, ask not what your country can do for you, "
"ask what you can do for your country."
)
assert segment.text == "".join(word.word for word in segment.words)
assert segment.start == segment.words[0].start
assert segment.end == segment.words[-1].end
batched_model = BatchedInferencePipeline(model=model)
result, info = batched_model.transcribe(
jfk_path, word_timestamps=True, vad_filter=False
)
assert info.language == "en"
assert info.language_probability > 0.7
segments = []
for segment in result:
segments.append(
{"start": segment.start, "end": segment.end, "text": segment.text}
)
assert len(segments) == 1
assert segment.text == (
" And so my fellow Americans ask not what your country can do for you, "
"ask what you can do for your country."
)
def test_batched_transcribe(physcisworks_path):
model = WhisperModel("tiny")
batched_model = BatchedInferencePipeline(model=model)
result, info = batched_model.transcribe(physcisworks_path, batch_size=16)
assert info.language == "en"
assert info.language_probability > 0.7
segments = []
for segment in result:
segments.append(
{"start": segment.start, "end": segment.end, "text": segment.text}
)
# number of near 30 sec segments
assert len(segments) == 7
result, info = batched_model.transcribe(
physcisworks_path,
batch_size=16,
without_timestamps=False,
word_timestamps=True,
)
segments = []
for segment in result:
assert segment.words is not None
segments.append(
{"start": segment.start, "end": segment.end, "text": segment.text}
)
assert len(segments) > 7
def test_empty_audio():
audio = np.asarray([], dtype="float32")
model = WhisperModel("tiny")
pipeline = BatchedInferencePipeline(model=model)
assert list(model.transcribe(audio)[0]) == []
assert list(pipeline.transcribe(audio)[0]) == []
model.detect_language(audio)
def test_prefix_with_timestamps(jfk_path):
model = WhisperModel("tiny")
segments, _ = model.transcribe(jfk_path, prefix="And so my fellow Americans")
segments = list(segments)
assert len(segments) == 1
segment = segments[0]
assert segment.text == (
" And so my fellow Americans, ask not what your country can do for you, "
"ask what you can do for your country."
)
assert segment.start == 0
assert 10 < segment.end <= 11
def test_vad(jfk_path):
model = WhisperModel("tiny")
segments, info = model.transcribe(
jfk_path,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200),
)
segments = list(segments)
assert len(segments) == 1
segment = segments[0]
assert segment.text == (
" And so my fellow Americans ask not what your country can do for you, "
"ask what you can do for your country."
)
assert 0 < segment.start < 1
assert 10 < segment.end < 11
assert info.vad_options.min_silence_duration_ms == 500
assert info.vad_options.speech_pad_ms == 200
def test_stereo_diarization(data_dir):
model = WhisperModel("tiny")
audio_path = os.path.join(data_dir, "stereo_diarization.wav")
left, right = decode_audio(audio_path, split_stereo=True)
segments, _ = model.transcribe(left)
transcription = "".join(segment.text for segment in segments).strip()
assert transcription == (
"He began a confused complaint against the wizard, "
"who had vanished behind the curtain on the left."
)
segments, _ = model.transcribe(right)
transcription = "".join(segment.text for segment in segments).strip()
assert transcription == "The horizon seems extremely distant."
def test_multilingual_transcription(data_dir):
model = WhisperModel("tiny")
pipeline = BatchedInferencePipeline(model)
audio_path = os.path.join(data_dir, "multilingual.mp3")
audio = decode_audio(audio_path)
segments, info = model.transcribe(
audio,
multilingual=True,
without_timestamps=True,
condition_on_previous_text=False,
)
segments = list(segments)
assert (
segments[0].text
== " Permission is hereby granted, free of charge, to any person obtaining a copy of the"
" software and associated documentation files to deal in the software without restriction,"
" including without limitation the rights to use, copy, modify, merge, publish, distribute"
", sublicence, and or cell copies of the software, and to permit persons to whom the "
"software is furnished to do so, subject to the following conditions. The above copyright"
" notice and this permission notice, shall be included in all copies or substantial "
"portions of the software."
)
assert (
segments[1].text
== " Jedem, der dieses Software und die dazu gehöregen Dokumentationsdatein erhält, wird "
"hiermit unengeltlich die Genehmigung erteilt, wird der Software und eingeschränkt zu "
"verfahren. Dies umfasst insbesondere das Recht, die Software zu verwenden, zu "
"vervielfältigen, zu modifizieren, zu Samenzofügen, zu veröffentlichen, zu verteilen, "
"unterzulizenzieren und oder kopieren der Software zu verkaufen und diese Rechte "
"unterfolgen den Bedingungen anderen zu übertragen."
)
segments, info = pipeline.transcribe(audio, multilingual=True)
segments = list(segments)
assert (
segments[0].text
== " Permission is hereby granted, free of charge, to any person obtaining a copy of the"
" software and associated documentation files to deal in the software without restriction,"
" including without limitation the rights to use, copy, modify, merge, publish, distribute"
", sublicence, and or cell copies of the software, and to permit persons to whom the "
"software is furnished to do so, subject to the following conditions. The above copyright"
" notice and this permission notice, shall be included in all copies or substantial "
"portions of the software."
)
assert (
"Dokumentationsdatein erhält, wird hiermit unengeltlich die Genehmigung erteilt,"
" wird der Software und eingeschränkt zu verfahren. Dies umfasst insbesondere das Recht,"
" die Software zu verwenden, zu vervielfältigen, zu modifizieren"
in segments[1].text
)
def test_hotwords(data_dir):
model = WhisperModel("tiny")
pipeline = BatchedInferencePipeline(model)
audio_path = os.path.join(data_dir, "hotwords.mp3")
audio = decode_audio(audio_path)
segments, info = model.transcribe(audio, hotwords="ComfyUI")
segments = list(segments)
assert "ComfyUI" in segments[0].text
assert info.transcription_options.hotwords == "ComfyUI"
segments, info = pipeline.transcribe(audio, hotwords="ComfyUI")
segments = list(segments)
assert "ComfyUI" in segments[0].text
assert info.transcription_options.hotwords == "ComfyUI"
def test_transcribe_signature():
model_transcribe_args = set(inspect.getargs(WhisperModel.transcribe.__code__).args)
pipeline_transcribe_args = set(
inspect.getargs(BatchedInferencePipeline.transcribe.__code__).args
)
pipeline_transcribe_args.remove("batch_size")
assert model_transcribe_args == pipeline_transcribe_args
def test_monotonic_timestamps(physcisworks_path):
model = WhisperModel("tiny")
pipeline = BatchedInferencePipeline(model=model)
segments, info = model.transcribe(physcisworks_path, word_timestamps=True)
segments = list(segments)
for i in range(len(segments) - 1):
assert segments[i].start <= segments[i].end
assert segments[i].end <= segments[i + 1].start
for word in segments[i].words:
assert word.start <= word.end
assert word.end <= segments[i].end
assert segments[-1].end <= info.duration
segments, info = pipeline.transcribe(physcisworks_path, word_timestamps=True)
segments = list(segments)
for i in range(len(segments) - 1):
assert segments[i].start <= segments[i].end
assert segments[i].end <= segments[i + 1].start
for word in segments[i].words:
assert word.start <= word.end
assert word.end <= segments[i].end
assert segments[-1].end <= info.duration

29
tests/test_utils.py Normal file
View File

@ -0,0 +1,29 @@
import os
from faster_whisper import available_models, download_model
def test_available_models():
models = available_models()
assert isinstance(models, list)
assert "tiny" in models
def test_download_model(tmpdir):
output_dir = str(tmpdir.join("model"))
model_dir = download_model("tiny", output_dir=output_dir)
assert model_dir == output_dir
assert os.path.isdir(model_dir)
assert not os.path.islink(model_dir)
for filename in os.listdir(model_dir):
path = os.path.join(model_dir, filename)
assert not os.path.islink(path)
def test_download_model_in_cache(tmpdir):
cache_dir = str(tmpdir.join("model"))
download_model("tiny", cache_dir=cache_dir)
assert os.path.isdir(cache_dir)