目录
2.fpei文件下
2.6 number_solver.py
2.7 process.py
2.8 taichi_solver.py
3. 算法总结
4. 代码运行
4.1 测试
4.2 基于GUI后端自定义框输出编辑图像结果
4.2.1 下载open-cv
4.2.2 输入命令
4.2.3 自定义框
4.2.4 按ESC退出
接续Fast-Poisson-Image-Editing代码介绍(一)
2.fpei文件下
该包中的代码与解析如下:
2.6 number_solver.py
提供了通过 Numba 加速的 Jacobi 方法的方程求解器和网格求解器的实现。这些实现包含了用于初始化、重置状态、同步状态、执行迭代步骤等方法。使用 Numba 的 njit 装饰器加速部分计算密集型的迭代函数,提高求解器的性能。
from typing import Tuple # 导入了 Tuple类型,用于指示函数或变量应该是元组类型
import numpy as np # # 导入了 numpy模块,并为它分配一个别名 np.NumPy是Python中用于数值计算的强大库。提供了对大型多维数组和矩阵的支持,以及对这些数组进行操作的数学函数集合
from numba import njit # 从numba模块中导入了njit Numba是一个用于Python的实时(JIT)编译器,它在运行时将Python函数翻译成机器码,这可以显著提高数值计算的执行速度
# equ_iter函数是一个被njit装饰的函数,用于执行方程求解器的迭代步骤。
# 它接受矩阵的大小 N、矩阵 A、矩阵 B 和解向量 X 作为参数,并返回更新后的解向量
@njit(fastmath=True)
def equ_iter(N: int, A: np.ndarray, B: np.ndarray, X: np.ndarray) -> np.ndarray:
return (B + X[A[:, 0]] + X[A[:, 1]] + X[A[:, 2]] + X[A[:, 3]]) / 4.0
# grid_iter 函数是一个被njit装饰的函数,用于执行网格求解器的迭代步骤。
# 它接受梯度图像和目标图像作为参数,并返回更新后的目标图像
@njit(fastmath=True)
def grid_iter(grad: np.ndarray, tgt: np.ndarray) -> np.ndarray: # 定义了 grid_iter功能的。它有两个参数: grad和 tgt,这两个都是NumPy数组。函数的返回类型也是一个NumPy数组
result = grad.copy() # 创建了一个 grad数组,并将其赋给变量 result.这样就保证了原来的 grad在计算期间不会修改数组
result[1:] += tgt[:-1] # 添加了 tgt数组向上偏移一个像素到 result数组中
result[:-1] += tgt[1:] # 添加了 tgt数组向下偏移一个像素到 result数组中
result[:, 1:] += tgt[:, :-1] # 添加了 tgt数组向左偏移一个像素 result数组中
result[:, :-1] += tgt[:, 1:] # 添加了 tgt数组向右偏移一个像素 result数组中
return result # 返回结果数组 result
class EquSolver(object):
"""Numba-based Jacobi method equation solver implementation."""
def __init__(self) -> None:
super().__init__() # 调用父类的构造函数
self.N = 0 # 将实例对象N赋值为0
# 该方法接受一个 np.ndarray 类型的参数 mask,表示掩码图像
# 函数返回一个 np.ndarray 类型的对象,表示生成的分区图像
def partition(self, mask: np.ndarray) -> np.ndarray:
return np.cumsum((mask > 0).reshape(-1)).reshape(mask.shape)
# 该方法用于重置求解器的状态,并接受四个参数:N(表示矩阵大小)、A(矩阵 A)、X(解向量 X)和 B(矩阵 B)
def reset(self, N: int, A: np.ndarray, X: np.ndarray, B: np.ndarray) -> None:
"""(4 - A)X = B"""
self.N = N # 指定参数的值 N到实例变量 N对象
self.A = A # 指定参数的值 A到实例变量 A对象
self.B = B # 指定参数的值 B到实例变量 B对象
self.X = equ_iter(N, A, B, X) # 指定调用 equ_iter函数的参数 N, A, B, 和 X到实例变量 X对象
def sync(self) -> None:
pass # 一个占位符方法,可以在以后实现同步
# 该方法用于执行求解器的迭代步骤,并接受一个整数类型的参数 iteration,表示要执行的迭代次数。
# 函数返回一个元组,包含两个 np.ndarray 类型的对象,分别表示更新后的解向量和误差
def step(self, iteration: int) -> Tuple[np.ndarray, np.ndarray]:
for _ in range(iteration):
# X = (B + AX) / 4
self.X = equ_iter(self.N, self.A, self.B, self.X)
tmp = self.B + self.X[self.A[:, 0]] + self.X[self.A[:, 1]] + \
self.X[self.A[:, 2]] + self.X[self.A[:, 3]] - 4.0 * self.X
err = np.abs(tmp).sum(axis=0) # 计算错误的方法,取 tmp数组,沿指定的轴(轴= 0 )对它们求和,并将结果赋给 err变量的
x = self.X.copy() # 创建了一个 self.X数组,并将其赋给变量 x.这样就保证了原来的 self.X在计算期间不会修改数组
x[x < 0] = 0 # 设置了所有的 x小于0数组为0,它约束解向量的值 x为非阴性
x[x > 255] = 255 # 将x约束在0到255的范围内
return x, err # 返回约束后的解向量x和误差err的元组
# 该方法用于重置求解器的状态,并接受四个参数:
# N(表示网格大小)、mask(掩码图像)、tgt(目标图像)和grad(梯度图像)
class GridSolver(object):
"""Numba-based Jacobi method grid solver implementation."""
def __init__(self) -> None:
super().__init__() # 调用父类的构造函数
self.N = 0 # 将实例对象N赋值为0
def reset(
self, N: int, mask: np.ndarray, tgt: np.ndarray, grad: np.ndarray
) -> None: # 此方法用于重置对象的状态。它需要四个参数: N(整数), mask, tgt, 和 grad(这三个都是NumPy数组)。这个方法的返回类型是 None
self.N = N # 指定参数的值 N到实例变量 N对象
self.mask = mask # 定参数的值 mask到实例变量 mask对象
self.bool_mask = mask.astype(bool) # 使用 astype(bool)方法,将 mask数组转换为布尔类型
tmp = grid_iter(grad, tgt) # 通过调用grid_iter函数,将梯度图像grad和目标图像tgt作为参数来计算临时变量tmp
tgt[self.bool_mask] = tmp[self.bool_mask] / 4.0
self.tgt = tgt # 指定更新的目标图像( tgt)转换为实例变量 tgt对象
self.grad = grad # 指定参数的值 grad到实例变量 grad对象
def sync(self) -> None: # 方法为空
pass # 是一个占位符方法,可以在以后实现同步
def step(self, iteration: int) -> Tuple[np.ndarray, np.ndarray]: # 定义了 step类的方法。它取一个整数参数 iteration表示要执行的迭代次数。该方法的返回类型是包含两个NumPy数组的元组
for _ in range(iteration): # 开始一个循环,循环迭代 iteration次数
tgt = grid_iter(self.grad, self.tgt) # 调用 grid_iter函数,并传入 self.grad和 self.tgt作为参数,得到一个新的数组 tgt
self.tgt[self.bool_mask] = tgt[self.bool_mask] / 4.0 # 将 tgt中对应掩码为真的位置的值除以 4.0,并将结果赋值给 self.tgt中对应掩码为真的位置
# 在循环结束后,计算误差
tmp = 4 * self.tgt - self.grad # 创建一个临时数组 tmp,计算公式为 4 * self.tgt - self.grad
tmp[1:] -= self.tgt[:-1] # 将 self.tgt 在垂直方向上向下平移一行,并将结果与 tmp 对应位置相减
tmp[:-1] -= self.tgt[1:] # 将 self.tgt在垂直方向上向上平移一行,并将结果与 tmp对应位置相减
tmp[:, 1:] -= self.tgt[:, :-1] # 将 self.tgt在水平方向上向右平移一列,并将结果与 tmp对应位置相减
tmp[:, :-1] -= self.tgt[:, 1:] # 将 self.tgt在水平方向上向左平移一列,并将结果与 tmp对应位置相减
# 计算误差err
err = np.abs(tmp[self.bool_mask]).sum(axis=0) # 通过对更新后的目标图像self.tgt中对应掩码为真的位置的值进行绝对值求和
tgt = self.tgt.copy() # 创建一个副本tgt,将目标图像self.tgt复制给它
tgt[tgt < 0] = 0
tgt[tgt > 255] = 255 # 将tgt约束在0到255的范围内
return tgt, err # 返回约束后的目标图像tgt和误差err的元组
2.7 process.py
建立了一个图像修复处理器的框架,具有不同的后端,允许用户根据系统能力和偏好进行选择。它包括解决 PIE 方程和执行基于网格的算法的功能,处理依赖关系,并提供错误消息,以实现无缝的用户体验。
import os # 导入了 Python 的 os模块,用于与操作系统进行交互
from abc import ABC, abstractmethod # 从 abc模块中导入了 ABC和 abstractmethod,用于定义抽象基类和抽象方法
from typing import Any, Optional, Tuple # 从 typing模块中导入了 Any、 Optional和 Tuple,用于类型提示
import numpy as np # 导入了 numpy库,并将其命名为 np,用于进行数值计算和数组操作
from fpie import np_solver # 入了一个名为 np_solver的模块,用于提供一些与数值求解相关的功能或算法
CPU_COUNT = os.cpu_count() or 1 # 用于获取当前系统的CPU核心数量,如果无法获取到CPU核心数量,则将其设为 1
DEFAULT_BACKEND = "numpy" # 示默认的后端选择
ALL_BACKEND = ["numpy"] # 是一个列表,包含了所有可用的后端名称
try:
from fpie import numba_solver # 尝试导入numba_solver模块,并将其添加到ALL_BACKEND列表中
ALL_BACKEND += ["numba"] # 将字符串 "numba"添加到名为 ALL_BACKEND的列表变量中
DEFAULT_BACKEND = "numba" # 将字符串 "numba"赋值给名为 DEFAULT_BACKEND的变量
except ImportError: # 如果导入失败
numba_solver = None # type: ignore # 则说明用户安装Numba
try:
from fpie import taichi_solver # 尝试导入 taichi_solver 模块
ALL_BACKEND += ["taichi-cpu", "taichi-gpu"] # 将字符串列表 ["taichi-cpu", "taichi-gpu"]添加到名为 ALL_BACKEND的列表变量中
DEFAULT_BACKEND = "taichi-cpu" # 将字符串 "taichi-cpu"赋值给名为 DEFAULT_BACKEND的变量
except ImportError: # 如果导入失败
taichi_solver = None # type: ignore # 表示没有可用的Taichi后端
try:
from fpie import core_gcc # 尝试导入core_gcc模块
DEFAULT_BACKEND = "gcc" # 字符串 "gcc"赋值给名为 DEFAULT_BACKEND的变量
ALL_BACKEND.append("gcc") # 将字符串 "gcc"添加到名为 ALL_BACKEND的列表变量中
except ImportError: # 如果导入失败
core_gcc = None # 则说明用户安装core_gcc
try:
from fpie import core_openmp # type: # ignore 尝试导入core_openmp模块
DEFAULT_BACKEND = "openmp" # 将字符串 "openmp"赋值给名为 DEFAULT_BACKEND的变量
ALL_BACKEND.append("openmp") # 将字符串 "openmp"添加到名为 ALL_BACKEND的列表变量中
except ImportError: # 如果导入失败
core_openmp = None # 则说明用户安装core_openmp
try:
from mpi4py import MPI # :尝试从 mpi4py模块中导入 MPI
from fpie import core_mpi # type: ignore
ALL_BACKEND.append("mpi") # 将字符串 "mpi"添加到名为 ALL_BACKEND的列表变量中
except ImportError: # 如果导入失败
MPI = None # type: ignore
core_mpi = None # 则说明用户安装core_mpi
try:
from fpie import core_cuda # 从 fpie模块中导入 core_cuda
DEFAULT_BACKEND = "cuda" # 将字符串 "cuda"赋值给名为 DEFAULT_BACKEND的变量
ALL_BACKEND.append("cuda") # 将字符串 "cuda"添加到名为 ALL_BACKEND的列表变量中
except ImportError: # 如果导入失败
core_cuda = None # 则说明用户安装core_cuda
class BaseProcessor(ABC): # 定义了一个抽象基类BaseProcessor,用于处理图像修复算法中的处理器
"""API definition for processor class."""
def __init__(
self, gradient: str, rank: int, backend: str, core: Optional[Any]
): # 方法接受四个参数: gradient、 rank、 backend和 core。其中, gradient是一个字符串, rank是一个整数, backend是一个字符串, core是一个可选的任意类型
if core is None: # 如果 core为 None,即没有提供有效的核心模块,则执行以下操作
error_msg = { # 创建一个字典 error_msg,其中包含不同后端的错误消息
"numpy":
"Please run `pip install numpy`.",
"numba":
"Please run `pip install numba`.",
"gcc":
"Please install cmake and gcc in your operating system.",
"openmp":
"Please make sure your gcc is compatible with `-fopenmp` option.",
"mpi":
"Please install MPI and run `pip install mpi4py`.",
"cuda":
"Please make sure nvcc and cuda-related libraries are available.",
"taichi":
"Please run `pip install taichi`.",
}
print(error_msg[backend.split("-")[0]]) # 打印对应后端的错误消息,根据 backend字符串中的第一个部分来选择错误消息
raise AssertionError(f"Invalid backend {backend}.") # 抛出一个断言错误( AssertionError),并附带错误消息,表示后端无效
self.gradient = gradient # 表示梯度类型的字符串,可以是"src"、"avg"或其他
self.rank = rank # 表示处理器的排名或标识符的整数
self.backend = backend # 表示所使用的后端的字符串
self.core = core # 表示具体的核心处理对象,可以是任意类型的对象
self.root = rank == 0 # 表示是否为根节点的布尔值,根据 rank 的值进行判断
# 该方法用于混合两个梯度向量,并返回混合后的结果。
# 它接受两个 np.ndarray 类型的参数 a 和 b,表示要混合的两个梯度向量
def mixgrad(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: # 定义了一个名为 mixgrad的方法,用于混合梯度的计算,该方法接受两个参数 a和 b,都是 NumPy 数组类型。方法的返回类型也是一个 NumPy 数组
if self.gradient == "src": # 值是"src",
return a # 则直接返回参数a
if self.gradient == "avg": # 值是"avg",
return (a + b) / 2 # 则将参数a和b相加后除以2
# mix gradient, see Equ. 12 in PIE paper
mask = np.abs(a) < np.abs(b) # 比较绝对值
a[mask] = b[mask] # # 选择性地 b中的元素替换到a中,以获得更大的梯度值
return a # 返回更新后的数组 a
@abstractmethod # 使用了 @abstractmethod装饰器,表示该方法是一个抽象方法,需要在子类中进行具体实现
def reset( # 用于重置处理器的状态
self,
src: np.ndarray, # 源图像的np.ndarray数组
mask: np.ndarray, # 掩码图像的np.ndarray数组
tgt: np.ndarray, # 目标图像的np.ndarray数组
mask_on_src: Tuple[int, int], # 一个元组,表示掩码在源图像上的位置偏移
mask_on_tgt: Tuple[int, int], # 一个元组,表示掩码在目标图像上的位置偏移
) -> int: # 返回一个整数值,表示最大 ID 值
pass
def sync(self) -> None: # 该方法没有参数,返回类型为 None
self.core.sync() # 用于同步核心模块的操作
@abstractmethod # 使用了 @abstractmethod装饰器,表示该方法是一个抽象方法,需要在子类中进行具体实现
def step(self, iteration: int) -> Optional[Tuple[np.ndarray, np.ndarray]]:
pass # 接受一个整数参数 iteration,返回类型为可选的元组,包含两个 NumPy 数组
class EquProcessor(BaseProcessor): # 定义了一个名为 EquProcessor的类,它是 BaseProcessor类的子类
"""PIE Jacobi equation processor.""" # 该类用于处理 PIE Jacobi 方程
def __init__(
self,
gradient: str = "max", # 表示梯度类型的字符串,默认为 "max"
backend: str = DEFAULT_BACKEND, # 表示所使用的后端的字符串,默认为DEFAULT_BACKEND
n_cpu: int = CPU_COUNT, # 表示使用的CPU核心数量,默认为CPU_COUNT
min_interval: int = 100, # 表示MPI最小间隔的整数,默认为100
block_size: int = 1024, # 表示CUDA中的块大小的整数,默认为1024
):
core: Optional[Any] = None
rank = 0
# 以下if语句用于根据所选的后端选择适当的核心处理对象,
# 并将其作为参数传递给父类BaseProcessor的初始化方法
if backend == "numpy": # 如果后端为 "numpy"
core = np_solver.EquSolver() # 则创建一个 np_solver.EquSolver 的实例并赋值给core变量。
elif backend == "numba" and numba_solver is not None: # 如果后端为"numba"且 numba_solver 不为空
core = numba_solver.EquSolver() # 则创建一个 numba_solver.EquSolver的实例
elif backend == "gcc": # 如果 backend是 "gcc",
core = core_gcc.EquSolver() # 则创建一个 core_gcc.EquSolver的实例
elif backend == "openmp" and core_openmp is not None: # 如果 backend是 "openmp"并且 core_openmp不为空
core = core_openmp.EquSolver(n_cpu) # 则创建一个带有 n_cpu参数的 core_openmp.EquSolver的实例
elif backend == "mpi" and core_mpi is not None: # 如果 backend是 "mpi"并且 core_mpi不为空
core = core_mpi.EquSolver(min_interval) # 则创建一个带有 min_interval参数的 core_mpi.EquSolver的实例,
rank = MPI.COMM_WORLD.Get_rank() # 并获取当前进程的 rank
elif backend == "cuda" and core_cuda is not None: # 如果 backend是 "cuda"并且 core_cuda不为空
core = core_cuda.EquSolver(block_size) # 则创建一个带有 block_size参数的 core_cuda.EquSolver的实例
elif backend.startswith("taichi") and taichi_solver is not None: # 如果 backend以 "taichi"开头并且 taichi_solver不为空
core = taichi_solver.EquSolver(backend, n_cpu, block_size) # 则创建一个带有 backend、 n_cpu和 block_size参数的 taichi_solver.EquSolver的实例
super().__init__(gradient, rank, backend, core) # 调用父类 BaseProcessor 的初始化方法,并传递梯度类型、排名、后端和核心处理对象作为参数进行初始化
def mask2index( # 调用这个方法,可以将掩码图像转换为相应的索引值,以便在图像修复算法中使用
self, mask: np.ndarray # 表示输入的掩码图像
) -> Tuple[np.ndarray, int, np.ndarray, np.ndarray]:
x, y = np.nonzero(mask) # 通过 np.nonzero(mask)获取掩码图像中非零元素的行坐标和列坐标,并分别赋值给 x和 y
max_id = x.shape[0] + 1 # 表示索引的最大值加1
index = np.zeros((max_id, 3)) # 用于存储索引信息
ids = self.core.partition(mask) # 调用核心处理对象的partition方法,将掩码图像传递给它,并获得分区后的结果ids
ids[mask == 0] = 0 # reserve id=0 for constant
index = ids[x, y].argsort() # 使用 ids[x, y]获取掩码图像中非零位置对应的索引值,通过 argsort()方法进行排序,排序后的结果赋值给变量 index
return ids, max_id, x[index], y[index] # 将 ids、 max_id、根据索引排序后的非零行坐标 x[index]和列坐标 y[index]组成一个元组,并作为方法的返回值
#
def reset( # 用于重置处理器的状态
self,
src: np.ndarray, # 表示源图像
mask: np.ndarray, # 表示掩码图像
tgt: np.ndarray, # 表示目标图像
mask_on_src: Tuple[int, int], # 表示掩码在源图像上的位置,是一个包含两个整数的元组
mask_on_tgt: Tuple[int, int], # 表示掩码在目标图像上的位置,是一个包含两个整数的元组
) -> int: # 返回类型是整数
assert self.root # 进行断言检查。如果 self.root的值为真,则继续执行;否则,会抛出 AssertionError异常
# check validity
# assert 0 <= mask_on_src[0] and 0 <= mask_on_src[1]
# assert mask_on_src[0] + mask.shape[0] <= src.shape[0]
# assert mask_on_src[1] + mask.shape[1] <= src.shape[1]
# assert mask_on_tgt[0] + mask.shape[0] <= tgt.shape[0]
# assert mask_on_tgt[1] + mask.shape[1] <= tgt.shape[1]
if len(mask.shape) == 3: # 如果掩码图像的维度为 3,
mask = mask.mean(-1) # 则取各通道的均值,将其转换为二维图像
mask = (mask >= 128).astype(np.int32) # 将掩码图像中大于等于 128 的像素值设为 1
# 对边缘进行零填充操作,将掩码图像的四个边界位置都设置为0
# zero-out edge
mask[0] = 0 # 将第一行的像素值设为 0
mask[-1] = 0 # 将最后一行的像素值设为 0
mask[:, 0] = 0 # 将第一列的像素值设为 0
mask[:, -1] = 0 # 将最后一列的像素值设为 0
x, y = np.nonzero(mask) # 获取掩码图像中非零元素的行坐标和列坐标,并分别赋值给 x和 y
x0, x1 = x.min() - 1, x.max() + 2 # 分别获取行坐标的最小值和最大值,并进行调整得到新的范围 x0和 x1
y0, y1 = y.min() - 1, y.max() + 2 # 分别获取列坐标的最小值和最大值,并进行调整得到新的范围 y0和 y1
mask_on_src = (x0 + mask_on_src[0], y0 + mask_on_src[1]) # 计算调整后的偏移
mask_on_tgt = (x0 + mask_on_tgt[0], y0 + mask_on_tgt[1]) # 计算调整后的偏移
mask = mask[x0:x1, y0:y1] # 根据调整后的索引范围 x0、 x1、 y0和 y1,从原始的掩码图像中提取相应区域的子图像,并将其赋值给变量 mask
ids, max_id, index_x, index_y = self.mask2index(mask) # 将调整后的掩码图像 mask作为参数传递给该方法,并获取返回的结果
src_x, src_y = index_x + mask_on_src[0], index_y + mask_on_src[1] # 将 index_x加上 mask_on_src[0]得到源图像中的行坐标 src_x,将 index_y加上 mask_on_src[1]得到源图像中的列坐标 src_y
tgt_x, tgt_y = index_x + mask_on_tgt[0], index_y + mask_on_tgt[1] # 将 index_x加上 mask_on_tgt[0]得到目标图像中的行坐标 tgt_x,将 index_y加上 mask_on_tgt[1]得到目标图像中的列坐标 tgt_y
# 根据调整后的索引和偏移,从源图像和目标图像中提取相应位置上的像素值,分别存储在以下变量中
src_C = src[src_x, src_y].astype(np.float32) # 通过索引操作 src[src_x, src_y],将源图像中 src_x和 src_y坐标位置上的像素值赋值给变量 src_C
src_U = src[src_x - 1, src_y].astype(np.float32) # 从源图像中提取了 src_x - 1行、 src_y列位置上的像素值,并将其转换为 np.float32类型,赋值给变量 src_U
src_D = src[src_x + 1, src_y].astype(np.float32) # 从源图像中提取了 src_x + 1行、 src_y列位置上的像素值,并将其转换为 np.float32类型,赋值给变量 src_D
src_L = src[src_x, src_y - 1].astype(np.float32) # 从源图像中提取了 src_x行、 src_y - 1列位置上的像素值,并将其转换为 np.float32类型,赋值给变量 src_L
src_R = src[src_x, src_y + 1].astype(np.float32) # 从源图像中提取了 src_x行、 src_y + 1列位置上的像素值,并将其转换为 np.float32类型,赋值给变量 src_R
tgt_C = tgt[tgt_x, tgt_y].astype(np.float32) # 从目标图像中提取了 tgt_x行、 tgt_y列位置上的像素值,并将其转换为 np.float32类型,赋值给变量 tgt_C
tgt_U = tgt[tgt_x - 1, tgt_y].astype(np.float32) # 从目标图像中提取了 tgt_x - 1行、 tgt_y列位置上的像素值,并将其转换为 np.float32类型,赋值给变量 tgt_U
tgt_D = tgt[tgt_x + 1, tgt_y].astype(np.float32) # 从目标图像中提取了 tgt_x + 1行、 tgt_y列位置上的像素值,并将其转换为 np.float32类型,赋值给变量 tgt_D
tgt_L = tgt[tgt_x, tgt_y - 1].astype(np.float32) # 从 tgt指定的原始位置左偏移一个单位的位置上的数组 (tgt_x, tgt_y)。然后将这些值强制转换为 np.float32并存储在变量中 tgt_L
tgt_R = tgt[tgt_x, tgt_y + 1].astype(np.float32) # 从 tgt指定的原始位置右偏移一个单位的位置上的数组 (tgt_x, tgt_y)。然后将这些值强制转换为 np.float32并存储在变量中 tgt_R
# 根据源图像和目标图像的像素值计算梯度 grad,使用 self.mixgrad 方法混合不同梯度类型的值
grad = self.mixgrad(src_C - src_L, tgt_C - tgt_L) \
+ self.mixgrad(src_C - src_R, tgt_C - tgt_R) \
+ self.mixgrad(src_C - src_U, tgt_C - tgt_U) \
+ self.mixgrad(src_C - src_D, tgt_C - tgt_D)
# 创建了三个矩阵:A、X和B,用于存储线性方程组的系数和常数项
A = np.zeros((max_id, 4), np.int32) # 创建一个NumPy数组 A与尺寸 (max_id, 4)和数据类型 np.int32.数组用零初始化
X = np.zeros((max_id, 3), np.float32) # 创建一个NumPy数组 X与尺寸 (max_id, 3)和数据类型 np.float32.数组用零初始化
B = np.zeros((max_id, 3), np.float32) # 创建一个NumPy数组 B与尺寸 (max_id, 3)和数据类型 np.float32.数组用零初始化
# 目标图像中对应位置的像素值赋值给 X 矩阵中的一部分
X[1:] = tgt[index_x + mask_on_tgt[0], index_y + mask_on_tgt[1]]
# four-way
A[1:, 0] = ids[index_x - 1, index_y] # 将 ids[index_x - 1, index_y]的值赋给 A数组第一列(除去第一行)
A[1:, 1] = ids[index_x + 1, index_y] # ids[index_x + 1, index_y]的值赋给 A数组第二列(除去第一行)
A[1:, 2] = ids[index_x, index_y - 1] # 将 ids[index_x, index_y - 1]的值赋给 A数组第三列(除去第一行)
A[1:, 3] = ids[index_x, index_y + 1] # 将 ids[index_x, index_y + 1]的值赋给 A数组第四列(除去第一行)
B[1:] = grad # 将 grad数组的值赋给 B数组的第二行及之后的所有行
m = (mask[index_x - 1, index_y] == 0).astype(float).reshape(-1, 1) # 根据 mask数组中指定索引位置的值是否为0,生成一个布尔型数组,并将其转换为浮点型数组,然后将其形状调整为列向量
B[1:] += m * tgt[index_x + mask_on_tgt[0] - 1, index_y + mask_on_tgt[1]] # 将 tgt数组中指定索引位置的值与上述生成的布尔型数组相乘,并将结果加到 B数组的第二行及之后的所有行上
m = (mask[index_x, index_y - 1] == 0).astype(float).reshape(-1, 1) # 生成一个布尔型数组,并将其转换为浮点型数组,然后将其形状调整为列向量
B[1:] += m * tgt[index_x + mask_on_tgt[0], index_y + mask_on_tgt[1] - 1] # 将 tgt数组中指定索引位置的值与上述生成的布尔型数组相乘,并将结果加到 B数组的第二行及之后的所有行上
m = (mask[index_x, index_y + 1] == 0).astype(float).reshape(-1, 1) # 生成一个布尔型数组,并将其转换为浮点型数组,然后将其形状调整为列向量
B[1:] += m * tgt[index_x + mask_on_tgt[0], index_y + mask_on_tgt[1] + 1] # 将 tgt数组中指定索引位置的值与上述生成的布尔型数组相乘,并将结果加到 B数组的第二行及之后的所有行上
m = (mask[index_x + 1, index_y] == 0).astype(float).reshape(-1, 1) # 根据mask数组中指定索引位置的值是否为0,生成一个布尔型数组,并将其转换为浮点型数组,然后将其形状调整为列向量
B[1:] += m * tgt[index_x + mask_on_tgt[0] + 1, index_y + mask_on_tgt[1]] # 将tgt数组中指定索引位置的值与上述生成的布尔型数组相乘,并将结果加到B数组的第二行及之后的所有行上
self.tgt = tgt.copy() # 将变量tgt进行深拷贝,赋值给self.tgt,避免对原始数据的修改
self.tgt_index = (index_x + mask_on_tgt[0], index_y + mask_on_tgt[1]) # 计算出一个元组,并将其赋值给self.tgt_index。这个元组表示了目标索引的位置
self.core.reset(max_id, A, X, B) # 调用self.core对象的reset方法,并传递参数max_id、A、X和B
return max_id # 返回变量max_id
# 该方法用于执行迭代步骤
def step(self, iteration: int) -> Optional[Tuple[np.ndarray, np.ndarray]]:
result = self.core.step(iteration) # 将计算结果中的修复后的像素值x更新到目标图像中的相应位置
if self.root: # 如果当前处理器是根节点
x, err = result # 回修复后的图像和误差
self.tgt[self.tgt_index] = x[1:] # 将x中从第二个元素开始的部分(即x[1:])赋值给self.tgt数组中的指定索引位置self.tgt_index。这样做是将修复后的像素值更新到目标图像中的相应位置
return self.tgt, err # 返回self.tgt和err作为函数或方法的结果,以元组的形式返回
return None # 如果当前处理器不是根节点,则返回None
# 该类继承自BaseProcessor类,用于执行PIE算法中的网格处理器
class GridProcessor(BaseProcessor):
"""PIE grid processor."""
def __init__(
self,
gradient: str = "max", # 表示梯度类型的字符串
backend: str = DEFAULT_BACKEND, # 表示所使用的后端的字符串
n_cpu: int = CPU_COUNT, # 表示使用的 CPU 核心数量
min_interval: int = 100, # 表示MPI最小间隔的整数
block_size: int = 1024, # 表示CUDA中的块大小的整数
grid_x: int = 8, # 表示网格在x轴上的数量的整数,默认为8
grid_y: int = 8, # 表示网格在y轴上的数量的整数,默认为8
):
core: Optional[Any] = None
rank = 0
# 根据后端的不同选择具体的核心处理对象,并将其传递给基类 BaseProcessor 进行初始化
if backend == "numpy": # 如果 backend 是 "numpy",
core = np_solver.GridSolver() # 则创建一个 np_solver.GridSolver() 对象,并赋值给变量 core
elif backend == "numba" and numba_solver is not None: # 如果 backend 是 "numba" 并且 numba_solver 不为空
core = numba_solver.GridSolver() # 创建一个 numba_solver.GridSolver() 对象,并赋值给变量 core
elif backend == "gcc": # 如果 backend 是 "gcc",
core = core_gcc.GridSolver(grid_x, grid_y) # 则创建一个 core_gcc.GridSolver(grid_x, grid_y) 对象,并赋值给变量 core
elif backend == "openmp" and core_openmp is not None: # 如果 backend 是 "openmp" 并且 core_openmp 不为空
core = core_openmp.GridSolver(grid_x, grid_y, n_cpu) # 则创建一个 core_openmp.GridSolver(grid_x, grid_y, n_cpu) 对象,并赋值给变量 core
elif backend == "mpi" and core_mpi is not None: # 如果 backend 是 "mpi" 并且 core_mpi 不为空
core = core_mpi.GridSolver(min_interval) # 则创建一个 core_mpi.GridSolver(min_interval) 对象,并赋值给变量 core
rank = MPI.COMM_WORLD.Get_rank() # 同时,获取当前进程的排名并赋值给变量 rank
elif backend == "cuda" and core_cuda is not None: # 如果 backend 是 "cuda" 并且 core_cuda 不为空
core = core_cuda.GridSolver(grid_x, grid_y) # 则创建一个 core_cuda.GridSolver(grid_x, grid_y) 对象,并赋值给变量 core
elif backend.startswith("taichi") and taichi_solver is not None: # 如果 backend 以 "taichi" 开头并且 taichi_solver 不为空
core = taichi_solver.GridSolver( # 创建一个 taichi_solver.GridSolver(grid_x, grid_y, backend, n_cpu, block_size) 对象,并赋值给变量 core
grid_x, grid_y, backend, n_cpu, block_size
)
super().__init__(gradient, rank, backend, core) # 调用基类 BaseProcessor 的初始化方法,传递梯度、排名、后端和核心处理对象进行初始化
def reset(
self,
src: np.ndarray, # 表示源图像的numpy数组
mask: np.ndarray, # 表示掩码图像的numpy数组
tgt: np.ndarray, # 表示目标图像的numpy数组
mask_on_src: Tuple[int, int], # 表示在源图像上的掩码位置的元组
mask_on_tgt: Tuple[int, int], # 表示在目标图像上的掩码位置的元组
) -> int:
assert self.root # 断言当前处理器是根节点。如果不是根节点,则会引发异常
# check validity
# assert 0 <= mask_on_src[0] and 0 <= mask_on_src[1]
# assert mask_on_src[0] + mask.shape[0] <= src.shape[0]
# assert mask_on_src[1] + mask.shape[1] <= src.shape[1]
# assert mask_on_tgt[0] + mask.shape[0] <= tgt.shape[0]
# assert mask_on_tgt[1] + mask.shape[1] <= tgt.shape[1]
if len(mask.shape) == 3: # 检查掩码图像的维度是否为3,即是否为彩色图像 如果是彩色图像,则执行下一步操作
mask = mask.mean(-1) # 对彩色图像的最后一个维度进行求平均操作,将其转换为灰度图像。这样做是为了将彩色图像转换为单通道的灰度图像
mask = (mask >= 128).astype(np.int32) # 将灰度图像中大于等于128的像素值设置为1,小于128的像素值设置为0,并将结果转换为np.int32类型
# 对边缘进行零填充操作,将掩码图像的四个边界位置都设置为 0
# zero-out edge
mask[0] = 0 # 将掩码图像的第一行(上边界)的所有元素设置为0
mask[-1] = 0 # 将掩码图像的最后一行(下边界)的所有元素设置为0
mask[:, 0] = 0 # 将掩码图像的第一列(左边界)的所有元素设置为0
mask[:, -1] = 0 # 将掩码图像的最后一列(右边界)的所有元素设置为0
x, y = np.nonzero(mask) # 找到掩码图像中非零元素的索引坐标,并将其分别保存在x和y中
x0, x1 = x.min() - 1, x.max() + 2 # 计算变量 x 的最小值减去1,得到 x0;计算变量 x 的最大值加上2,得到 x1
y0, y1 = y.min() - 1, y.max() + 2 # 计算变量 y 的最小值减去1,得到 y0;计算变量 y 的最大值加上2,得到 y1
mask = mask[x0:x1, y0:y1] # 根据计算出的索引范围,从原始的掩码图像中提取相应区域的子图像,更新mask变量
max_id = np.prod(mask.shape) # 得到最大 ID 值
# 用于计算修复区域的梯度
# 从源图像 src 和目标图像 tgt 中根据掩码位置偏移和索引范围裁剪出相应区域的子图像,并将它们转换为 np.float32 类型
src_crop = src[mask_on_src[0] + x0:mask_on_src[0] + x1,
mask_on_src[1] + y0:mask_on_src[1] + y1].astype(np.float32)
tgt_crop = tgt[mask_on_tgt[0] + x0:mask_on_tgt[0] + x1,
mask_on_tgt[1] + y0:mask_on_tgt[1] + y1].astype(np.float32)
grad = np.zeros([*mask.shape, 3], np.float32)
grad[1:] += self.mixgrad( # 表示下方向的梯度
src_crop[1:] - src_crop[:-1], tgt_crop[1:] - tgt_crop[:-1]
)
grad[:-1] += self.mixgrad( # 表示上方向的梯度
src_crop[:-1] - src_crop[1:], tgt_crop[:-1] - tgt_crop[1:]
)
grad[:, 1:] += self.mixgrad( # 表示右方向的梯度
src_crop[:, 1:] - src_crop[:, :-1], tgt_crop[:, 1:] - tgt_crop[:, :-1]
)
grad[:, :-1] += self.mixgrad( # 表示左方向的梯度
src_crop[:, :-1] - src_crop[:, 1:], tgt_crop[:, :-1] - tgt_crop[:, 1:]
)
grad[mask == 0] = 0 # 将 grad 中掩码为 0 的位置的梯度值设置为 0。
# 这样可以确保在修复过程中,只有在掩码区域内的像素才会受到梯度的影响,而在掩码区域外的像素则不受影响
self.x0 = mask_on_tgt[0] + x0 # 将 mask_on_tgt[0] 和 x0 相加的结果赋值给 self.x0。这个操作是为了计算目标图像中裁剪区域的左上角在整个图像中的位置
self.x1 = mask_on_tgt[0] + x1 # 将 mask_on_tgt[0] 和 x1 相加的结果赋值给 self.x1。这个操作是为了计算目标图像中裁剪区域的右下角在整个图像中的位置
self.y0 = mask_on_tgt[1] + y0 # 将 mask_on_tgt[1] 和 y0 相加的结果赋值给 self.y0。这个操作是为了计算目标图像中裁剪区域的左上角在整个图像中的位置
self.y1 = mask_on_tgt[1] + y1 # 将 mask_on_tgt[1] 和 y1 相加的结果赋值给 self.y1。这个操作是为了计算目标图像中裁剪区域的右下角在整个图像中的位置
self.tgt = tgt.copy() # 将变量 tgt 进行深拷贝,赋值给 self.tgt。这样做是为了避免对原始数据的修改
self.core.reset(max_id, mask, tgt_crop, grad) # 用 self.core 对象的 reset 方法,并传递参数 max_id、mask、tgt_crop 和 grad。对 self.core 对象进行一些重置操作
return max_id # 返回变量 max_id
def step(self, iteration: int) -> Optional[Tuple[np.ndarray, np.ndarray]]:
result = self.core.step(iteration) # 调用 self.core 对象的 step 方法,并传递参数 iteration
if self.root: # 如果当前处理器是根节点
tgt, err = result # 将计算结果中的修复后的像素值tgt更新到目标图像self.tgt中的相应位置
self.tgt[self.x0:self.x1, self.y0:self.y1] = tgt
return self.tgt, err # 返回修复后的图像和误差
return None
2.8 taichi_solver.py
实现基于 Taichi 的 Jacobi 方法,用于求解线性方程组和执行图像修复任务。 Jacobi 方法被应用于网格上,通过迭代更新目标图像的值,并计算误差,以实现对图像的修复。
from typing import Tuple # 从 typing 模块中导入 Tuple 类型,用于定义元组类型的注解
import numpy as np # 导入 numpy 库,并将其命名为 np,用于进行科学计算和数组操作
import taichi as ti # 导入 taichi 库,并将其命名为 ti,用于进行物理仿真和图形计算
@ti.data_oriented
class EquSolver(object):
"""Taichi-based Jacobi method equation solver implementation."""
# 初始化方法,接受后端类型、CPU核心数量和块大小等参数
def __init__(self, backend: str, n_cpu: int, block_size: int) -> None:
super().__init__()
self.parallelize = n_cpu # 并行化标志,表示使用的 CPU 核心数量
self.block_dim = block_size # 块大小,用于指定 Tiachi 并行计算的块维度
ti.init(arch=getattr(ti, backend.split("-")[-1])) # 初始化Taichi环境,指定所使用的后端类型和架构
self.N = 0 # 线性方程组的大小,默认为 0
self.fb: ti.FieldsBuilder # Taichi中用于构建字段的辅助对象
self.fbst: ti._snode.snode_tree.SNodeTree # Taichi中用于构建字段的辅助对象
# Taichi 字段,分别用于存储误差、系数矩阵、常数项、未知变量和临时变量
self.terr = ti.field(ti.f32, (3,)) # 创建一个名为 terr 的 Taichi 字段,存储误差,数据类型为 ti.f32(单精度浮点数),形状为 (3,)
self.tA = ti.field(ti.i32) # 创建一个名为 tA 的 Taichi 字段,存储系数矩阵,数据类型为 ti.i32(32位整数)
self.tB = ti.field(ti.f32) # 创建一个名为 tB 的 Taichi 字段,存储常数项,数据类型为 ti.f32
self.tX = ti.field(ti.f32) # 创建一个名为 tX 的 Taichi 字段,存储未知变量,数据类型为 ti.f32
self.tmp = ti.field(ti.f32) # 创建一个名为 tmp 的 Taichi 字段,存储临时变量,数据类型为 ti.f32
def partition(self, mask: np.ndarray) -> np.ndarray: # 这个方法接受一个名为 mask 的 numpy 数组作为参数,并返回一个 numpy 数组
return np.cumsum((mask > 0).reshape(-1)).reshape(mask.shape)
# 调用这个方法,可以设置线性方程组的大小、系数矩阵、未知变量的初始值和常数项,并初始化Taichi字段
def reset(self, N: int, A: np.ndarray, X: np.ndarray, B: np.ndarray) -> None:
"""(4 - A)X = B"""
self.N = N # 将参数 N 的值赋给实例变量 N,表示线性方程组的大小
self.A = A # 将参数 A 的值赋给实例变量 A,表示系数矩阵
self.B = B # 将参数 B 的值赋给实例变量 B,表示常数项
self.X = X # 将参数 X 的值赋给实例变量 X,表示未知变量
if hasattr(self, "fbst"): # 检查是否已经存在Taichi字段构建器 fbst
self.fbst.destroy() # 存在则销毁它,并重新创建tAtB、tX和tmp字段
self.tA = ti.field(ti.i32) # 重新创建 tA 字段,数据类型为 ti.i32
self.tB = ti.field(ti.f32) # 重新创建 tB 字段,数据类型为 ti.f32
self.tX = ti.field(ti.f32) # 重新创建 tX 字段,数据类型为 ti.f32
self.tmp = ti.field(ti.f32) # 重新创建 tmp 字段,数据类型为 ti.f32
self.fb = ti.FieldsBuilder() # 创建一个新的字段构建器对象
self.fb.dense(ti.ij, (N, 4)).place(self.tA) # 使用字段构建器将 tA 字段放置在二维网格 (N, 4) 上
self.fb.dense(ti.ij, (N, 3)).place(self.tB) # 使用字段构建器将 tB 字段放置在二维网格 (N, 3) 上
self.fb.dense(ti.ij, (N, 3)).place(self.tX) # 使用字段构建器将 tX 字段放置在二维网格 (N, 3) 上
self.fb.dense(ti.ij, (N, 3)).place(self.tmp) # 使用字段构建器将 tmp 字段放置在二维网格 (N, 3) 上
self.fbst = self.fb.finalize() # 使用字段构建器的 finalize() 方法完成字段的创建和布局
self.tA.from_numpy(A) # 参数 A 的值从 numpy 数组转换为 Taichi 字段
self.tB.from_numpy(B) # 将参数 B 的值从 numpy 数组转换为 Taichi 字段
self.tX.from_numpy(X) # 将参数 X 的值从 numpy 数组转换为 Taichi 字段
self.tmp.from_numpy(X) # 将参数 X 的值从 numpy 数组转换为 Taichi 字段
def sync(self) -> None: # 用于同步操作
pass
@ti.kernel # 定义了一个核函数 iter_kernel,用于执行 Jacobi 迭代方
def iter_kernel(self) -> int: # 定义了 iter_kernel 方法,用于执行Jacobi迭代方法的核函数
ti.loop_config(parallelize=self.parallelize, block_dim=self.block_dim) # 配置并行化和块大小参数
for i in range(1, self.N): # 对于 i 在范围从 1 到 self.N 的循环迭代
i0, i1 = self.tA[i, 0], self.tA[i, 1] # 将 self.tA[i, 0] 和 self.tA[i, 1] 的值分别赋给 i0 和 i1
i2, i3 = self.tA[i, 2], self.tA[i, 3] # 将 self.tA[i, 2] 和 self.tA[i, 3] 的值分别赋给 i2 和 i3
# X = (B + AX) / 4
self.tX[i, 0] = (
self.tB[i, 0] + self.tX[i0, 0] + self.tX[i1, 0] + self.tX[i2, 0] +
self.tX[i3, 0]
) / 4.0 # 根据 Jacobi 迭代方法的公式,计算 tX[i, 0] 的值,并将结果赋给它
self.tX[i, 1] = (
self.tB[i, 1] + self.tX[i0, 1] + self.tX[i1, 1] + self.tX[i2, 1] +
self.tX[i3, 1]
) / 4.0 # 根据 Jacobi 迭代方法的公式,计算 tX[i, 1] 的值,并将结果赋给它
self.tX[i, 2] = (
self.tB[i, 2] + self.tX[i0, 2] + self.tX[i1, 2] + self.tX[i2, 2] +
self.tX[i3, 2]
) / 4.0 # 根据 Jacobi 迭代方法的公式,计算 tX[i, 2] 的值,并将结果赋给它
return 0
@ti.kernel
def error_kernel(self) -> int: # 该方法用于计算误差
ti.loop_config(parallelize=self.parallelize, block_dim=self.block_dim) # 配置并行化和块大小参数
for i in range(1, self.N):
i0, i1 = self.tA[i, 0], self.tA[i, 1]
i2, i3 = self.tA[i, 2], self.tA[i, 3]
self.tmp[i, 0] = ti.abs(
self.tB[i, 0] + self.tX[i0, 0] + self.tX[i1, 0] + self.tX[i2, 0] +
self.tX[i3, 0] - 4.0 * self.tX[i, 0]
)
self.tmp[i, 1] = ti.abs(
self.tB[i, 1] + self.tX[i0, 1] + self.tX[i1, 1] + self.tX[i2, 1] +
self.tX[i3, 1] - 4.0 * self.tX[i, 1]
)
self.tmp[i, 2] = ti.abs(
self.tB[i, 2] + self.tX[i0, 2] + self.tX[i1, 2] + self.tX[i2, 2] +
self.tX[i3, 2] - 4.0 * self.tX[i, 2]
)
self.terr[0] = self.terr[1] = self.terr[2] = 0
ti.loop_config(parallelize=self.parallelize, block_dim=self.block_dim)
for i, j in self.tmp:
self.terr[j] += self.tmp[i, j]
return 0
# 用于执行迭代步骤
def step(self, iteration: int) -> Tuple[np.ndarray, np.ndarray]:
for _ in range(iteration): # 指定次数的迭代
self.iter_kernel() # 更新未知变量的值
self.error_kernel() # 计算最终的误差
x = self.tX.to_numpy()
x[x < 0] = 0
x[x > 255] = 255 # 保像素值范围在 0 到 255 之间
return x, self.terr.to_numpy()
@ti.data_oriented
class GridSolver(object):
"""Taichi-based Jacobi method grid solver implementation."""
def __init__(
self, grid_x: int, grid_y: int, backend: str, n_cpu: int, block_size: int
) -> None:
super().__init__()
self.N = 0
self.grid_x = grid_x # 网格在x轴的数量
self.grid_y = grid_y # 网格在y轴的数量
self.parallelize = n_cpu # CPU 核心数量,用于并行化计算
self.block_dim = block_size # 块大小,用于指定Taichi并行计算的块维度
ti.init(arch=getattr(ti, backend.split("-")[-1])) # 始化Taichi环境,指定所使用的后端类型和架构
self.fb: ti.FieldsBuilder
self.fbst: ti._snode.snode_tree.SNodeTree
self.terr = ti.field(ti.f32, (3,))
self.tmask = ti.field(ti.i32)
self.ttgt = ti.field(ti.f32)
self.tgrad = ti.field(ti.f32)
self.tmp = ti.field(ti.f32)
# 该方法用于重置GridSolver对象的状态
# 在方法内部,首先根据网格的数量和掩码的形状计算出新的修复区域的大小,并保存原始的修复区域大小
def reset( # reset方法的另一部分,用于重新创建和初始化Taichi字段
self, N: int, mask: np.ndarray, tgt: np.ndarray, grad: np.ndarray
) -> None:
gx, gy = self.grid_x, self.grid_y
self.orig_N, self.orig_M = N, M = mask.shape # 首先计算修复区域的大小,并将其保存为N和M的值
pad_x = 0 if N % gx == 0 else gx - (N % gx)
pad_y = 0 if M % gy == 0 else gy - (M % gy)
if pad_x or pad_y: # 如果修复区域的大小不能被网格块的大小整除
mask = np.pad(mask, [(0, pad_x), (0, pad_y)]) # 则使用np.pad函数对数组进行填充
tgt = np.pad(tgt, [(0, pad_x), (0, pad_y), (0, 0)])
grad = np.pad(grad, [(0, pad_x), (0, pad_y), (0, 0)])
self.N, self.M = N, M = mask.shape
bx, by = N // gx, M // gy
self.mask = mask
self.tgt = tgt
self.grad = grad
if hasattr(self, "fbst"): # 首先检查是否已经存在Taichi字段fbst
self.fbst.destroy() # 如果存在,则调用destroy方法销毁它
self.tmask = ti.field(ti.i32) # 并重新创建以下字段
self.ttgt = ti.field(ti.f32)
self.tgrad = ti.field(ti.f32)
self.tmp = ti.field(ti.f32)
self.fb = ti.FieldsBuilder() # 使用Taichi字段构建器fb创建字段
# 通过多次调用 dense 方法可以创建多级的嵌套字段
self.fb.dense(ti.ij, (bx, by)).dense(ti.ij, (gx, gy)).place(self.tmask)
self.fb.dense(ti.ij, (bx, by)).dense(ti.ij, (gx, gy)) \
.dense(ti.k, 3).place(self.ttgt)
self.fb.dense(ti.ij, (bx, by)).dense(ti.ij, (gx, gy)) \
.dense(ti.k, 3).place(self.tgrad)
self.fb.dense(ti.ij, (bx, by)).dense(ti.ij, (gx, gy)) \
.dense(ti.k, 3).place(self.tmp)
self.fbst = self.fb.finalize()
self.tmask.from_numpy(mask)
self.ttgt.from_numpy(tgt)
self.tgrad.from_numpy(grad)
self.tmp.from_numpy(grad)
def sync(self) -> None:
pass
@ti.kernel
def iter_kernel(self) -> int: # 用于执行网格求解中的 Jacobi 迭代步骤
ti.loop_config(parallelize=self.parallelize, block_dim=self.block_dim) # 配置并行化和块大小参数
for i, j in self.tmask: # 于每个索引 (i, j),根据Jacobi迭代公式计算出目标图像ttgt在当前像素位置的更新值
if self.tmask[i, j] > 0:
# tgt = (grad + Atgt) / 4
self.ttgt[i, j, 0] = (
self.tgrad[i, j, 0] + self.ttgt[i - 1, j, 0] + self.ttgt[i, j - 1, 0]
+ self.ttgt[i, j + 1, 0] + self.ttgt[i + 1, j, 0]
) / 4.0
self.ttgt[i, j, 1] = (
self.tgrad[i, j, 1] + self.ttgt[i - 1, j, 1] + self.ttgt[i, j - 1, 1]
+ self.ttgt[i, j + 1, 1] + self.ttgt[i + 1, j, 1]
) / 4.0
self.ttgt[i, j, 2] = (
self.tgrad[i, j, 2] + self.ttgt[i - 1, j, 2] + self.ttgt[i, j - 1, 2]
+ self.ttgt[i, j + 1, 2] + self.ttgt[i + 1, j, 2]
) / 4.0
return 0
@ti.kernel
def error_kernel(self) -> int: # 用于计算误差的核函数
ti.loop_config(parallelize=self.parallelize, block_dim=self.block_dim) # 配置并行化和块大小参数
for i, j in self.tmask: # 使用嵌套的for循环遍历tmask字段中非零值的索引 (i, j)些索引表示修复区域中需要进行计算误差的像素
if self.tmask[i, j] > 0: # 首先判断当前像素位置 (i, j) 是否在修复区域内
self.tmp[i, j, 0] = ti.abs( # 如果在修复区域内,则根据误差计算公式计算每个颜色通道的误差值
self.tgrad[i, j, 0] + self.ttgt[i - 1, j, 0] +
self.ttgt[i, j - 1, 0] + self.ttgt[i, j + 1, 0] +
self.ttgt[i + 1, j, 0] - 4.0 * self.ttgt[i, j, 0]
)
self.tmp[i, j, 1] = ti.abs(
self.tgrad[i, j, 1] + self.ttgt[i - 1, j, 1] +
self.ttgt[i, j - 1, 1] + self.ttgt[i, j + 1, 1] +
self.ttgt[i + 1, j, 1] - 4.0 * self.ttgt[i, j, 1]
)
self.tmp[i, j, 2] = ti.abs(
self.tgrad[i, j, 2] + self.ttgt[i - 1, j, 2] +
self.ttgt[i, j - 1, 2] + self.ttgt[i, j + 1, 2] +
self.ttgt[i + 1, j, 2] - 4.0 * self.ttgt[i, j, 2]
)
else: # 如果不在修复区域内(即掩码为零),则将误差值设置为0
self.tmp[i, j, 0] = self.tmp[i, j, 1] = self.tmp[i, j, 2] = 0.0
self.terr[0] = self.terr[1] = self.terr[2] = 0
ti.loop_config(parallelize=self.parallelize, block_dim=self.block_dim)
for i, j, k in self.tmp:
self.terr[k] += self.tmp[i, j, k]
return 0
# 用于执行迭代步骤并返回结果
def step(self, iteration: int) -> Tuple[np.ndarray, np.ndarray]:
for _ in range(iteration):
self.iter_kernel() # 更新目标图像的值
self.error_kernel() # 计算最终的误差
tgt = self.ttgt.to_numpy()[:self.orig_N, :self.orig_M] # 将目标图像ttgt转换为NumPy数组
tgt[tgt < 0] = 0
tgt[tgt > 255] = 255 # 确保像素值范围在 0 到 255 之间
return tgt, self.terr.to_numpy() # 返回修复后的目标图像数组 tgt 和误差数组
3. 算法总结
Fast Poisson Image Editing 是一个用于图像编辑的算法,它的核心思想是通过求解泊松方程来进行图像修复。
4. 代码运行
4.1 测试
(1)由data.py代码中download函数可以知道如何通过链接下载图片并指定文件名。
(2)通过操作可以在data文件下得到三张图:
(3) 输入命令行:
fpie -s test3_src.jpg -m test3_mask.jpg -t test3_tgt.jpg -o result3.jpg -h1 100 -w1 100 -n 5000 -g max
(4)运行后得到结果图:
以下图片为我们随机在网上找的图片,并进行编辑,非自带图片
4.2 基于GUI后端自定义框输出编辑图像结果
4.2.1 下载open-cv
pip install opencv-python
4.2.2 输入命令
fpie-gui -s test1_src.jpg -t test1_target.jpg -o result.jpg -b taichi-cpu -n 10000
以下图片为我们随机在网上找的图片,并进行编辑,非自带图片。
4.2.3 自定义框
在source中框出物品,再在target中点击要嵌入的位置,最后result会产生结果。