Source code for tfds_defect_detection.downloader.mvtec
import shutil
from pathlib import Path
from keras.utils import get_file
from tqdm import tqdm
import PIL.Image
from tfds_defect_detection.utils import copy_to_folder
def _prepare_good_images(
download_directory: Path,
new_root="train_images",
skip_if_image_exists=True
):
old_root = download_directory.name
result = Path(str(download_directory).replace(old_root, new_root))
all_images = list(download_directory.rglob("**/*train/good/*.*"))
if not len(all_images) and result.is_dir():
return result
print("Preparing", old_root, new_root)
for path in tqdm(all_images):
if path.name.startswith("."):
continue
target = Path(str(path).replace(old_root, new_root))
if skip_if_image_exists and target.is_file():
continue
copy_to_folder(path, target)
return result
def _prepare_anomaly_images_with_masks(
download_directory: Path,
new_root_images="test_images",
new_root_masks="test_masks",
mask_suffix="_mask",
skip_if_image_exists=True
):
old_root = download_directory.name
result = (
Path(str(download_directory).replace(old_root, new_root_images)),
Path(str(download_directory).replace(old_root, new_root_masks))
)
all_images = list(download_directory.rglob("**/*test/**/*.*"))
if not len(all_images) and result[0].is_dir() and result[1].is_dir():
return result
print("Preparing", old_root, new_root_images, "and", new_root_masks)
for img_path in tqdm(all_images):
if img_path.name.startswith("."):
continue
img_target = Path(str(img_path).replace(old_root, new_root_images))
if skip_if_image_exists and img_target.is_file():
continue
mask_path = Path(str(img_path).replace("test", "ground_truth"))
mask_path = mask_path.parent / (mask_path.stem + mask_suffix + ".png")
mask_target = Path(str(img_path).replace(old_root, new_root_masks))
mask_target = mask_target.with_suffix(".png")
copy_to_folder(img_path, img_target)
if not mask_path.is_file():
img_size = PIL.Image.open(img_path).size
mask = PIL.Image.new(mode="L", size=img_size, color="black")
mask_target.parent.mkdir(exist_ok=True, parents=True)
mask.save(mask_target)
elif not mask_target.is_file():
copy_to_folder(mask_path, mask_target)
return result
[docs]def restructure_mvtec_style_dataset(
root_dir: Path,
new_root_train_images="train_images",
new_root_test_images="test_images",
new_root_test_masks="test_masks",
mask_suffix="_mask",
delete_tmp=True
):
result = (
_prepare_good_images(
root_dir,
new_root_train_images
),
*_prepare_anomaly_images_with_masks(
root_dir,
new_root_test_images,
new_root_test_masks,
mask_suffix
)
)
if delete_tmp:
shutil.rmtree(root_dir, ignore_errors=True)
root_dir.mkdir(parents=True, exist_ok=True)
return result
if __name__ == "__main__":
_prepare_anomaly_images_with_masks(
Path("E:\\VisA_mvtec_style"),
mask_suffix=""
)
[docs]def download_and_extract(cache_dir):
mvtec_name = "mvtec_anomaly_detection.tar.xz"
mvtec_folder_name = "mvtec_download"
mvtec_root = cache_dir / mvtec_folder_name
if not mvtec_root.is_dir():
get_file(
fname=mvtec_name,
origin=MVTEC_ORIGIN,
extract=True,
cache_dir=cache_dir,
cache_subdir=mvtec_folder_name
)
return mvtec_root
[docs]MVTEC_ORIGIN = "https://www.mydrive.ch/shares/38536/3830184030e49fe74747669442f0f282/download/420938113-1629952094/mvtec_anomaly_detection.tar.xz"