文档地址: Pipeline — NVIDIA DALI 1.12.0 documentation
在DALI中,任何数据处理任务都有一个称为Pipeline的中心对象。Pipeline对象nvidia.dali.Pipeline或其派生类的实例。Pipeline封装了数据处理图和执行引擎。
您可以通过以下方式定义DALI管道:
- 通过实现一个内部使用DALI操作符的函数,并使用pipeline_def()装饰器修饰。
- 直接实例化Pipeline对象,构建图并使用Pipeline.set_outputs()设置管道输出。
- 通过继承Pipeline类并重写Pipeline.define_graph()方法(这是定义DALI管道的传统方式)。
数据处理图
DALI管道表示为一组操作的图形。图中有两种类型的节点:
- 操作符:每次调用操作符时都会创建
- 数据节点(参见DataNode):表示操作符的输出和输入;它们从操作符调用中返回,并通过将它们作为输入传递给其他操作符来在图中建立连接。
数据节点可以通过调用操作符函数进行转换。它们还支持Python风格的索引,可以嵌入到数学表达式中。
示例:
@pipeline_def # create a pipeline with processing graph defined by the function below
def my_pipeline():
""" Create a pipeline which reads images and masks, decodes the images and returns them. """
img_files, labels = fn.readers.file(file_root="image_dir", seed=1)
mask_files, _ = fn.readers.file(file_root="mask_dir", seed=1)
images = fn.decoders.image(img_files, device="mixed")
masks = fn.decoders.image(mask_files, device="mixed")
return images, masks, labels
pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)
pipe.build()
生成的图形是:
处理图结构
DALI管道是分阶段执行的。这些阶段对应于可以为操作符指定的设备参数,并按照以下顺序执行:
- 'cpu' - 接受CPU输入并产生CPU输出的操作符。
- 'mixed' - 接受CPU输入并产生GPU输出的操作符,例如nvidia.dali.fn.decoders.image()。
- 'gpu' - 接受GPU输入并产生GPU输出的操作符。
由CPU操作符生成的数据可能通过在DataNode(DALI操作符的输出)上调用.gpu()来显式复制到GPU。
在较早阶段执行的操作符无法消耗由后续阶段生成的数据。
大多数DALI操作符接受额外的关键字参数用于参数化它们的行为。这些命名关键字参数(与位置输入不同)可以是:
- Python常量
- 参数输入 - CPU操作符的输出,在操作符的文档字符串中表示为TensorList。
对于参数输入,将一个操作符的输出作为其他操作符的命名关键字参数传递将在处理图中建立连接。
这些参数将作为DALI管道图的一部分在每次迭代和每个样本上计算。请记住,只有CPU操作符可以用作参数输入。
示例:
@pipeline_def
def my_pipeline():
img_files, labels = fn.readers.file(file_root='image_dir', device='cpu')
# `images` is GPU data (result of Mixed operator)
images = fn.decoders.image(img_files, device='mixed')
# `coin_flip` must be on CPU so the `flip_params` can be used as argument input
flip_param = fn.random.coin_flip(device='cpu')
# `images` is input (GPU) and `flip_param` is argument input (CPU)
flipped = fn.flip(images, horizontal=flip_param, device='gpu')
# `labels` is explicitly marked for transfer to GPU, `flipped` is already GPU
return flipped, labels.gpu()
pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)
pipe.build()
note
如果未指定设备参数,则会根据输入的放置自动选择。如果至少有一个GPU输入,则假定设备='gpu',否则使用'cpu'。
上面的示例为了清晰起见明确添加了设备参数,但如果只为fn.decoders.image指定了device='mixed',它也将起同样的作用。
当前管道
不会对不贡献到管道输出的子图进行自动修剪。如果一个操作符具有副作用(例如PythonFunction操作符系列),则不能在没有设置当前管道的情况下调用它。当前管道在定义图形在派生管道的Pipeline.define_graph()方法中时会被隐式设置。否则,可以使用上下文管理器(with语句)进行设置:
pipe = dali.Pipeline(batch_size=N, num_threads=3, device_id=0)
with pipe:
src = dali.ops.ExternalSource(my_source, num_outputs=2)
a, b = src()
pipe.set_outputs(a, b)
当使用pipeline_def()创建管道时,定义管道的函数会在新创建的管道的范围内执行。下面的示例等同于前面的示例:
@dali.pipeline_def(batch_size=N, num_threads=3, device_id=0)
def my_pipe(my_source):
return dali.fn.external_source(my_source, num_outputs=2)
pipe = my_pipe(my_source)
class nvidia.dali.Pipeline(batch_size=- 1, num_threads=- 1, device_id=- 1, seed=- 1, exec_pipelined=True, prefetch_queue_depth=2, exec_async=True, bytes_per_sample=0, set_affinity=False, max_streams=- 1, default_cuda_stream_priority=0, *, enable_memory_stats=False, py_num_workers=1, py_start_method='fork', py_callback_pickler=None)¶
Pipeline 类是所有DALI数据管道的基类。该管道封装了数据处理图和执行引擎。
参数:
- batch_size (int, optional, default = -1) –
Pipeline 的最大批处理大小。该参数的负值是无效的,只有在使用序列化的 pipeline 时才能使用默认值(使用序列化 pipeline 中存储的值)。在大多数情况下,pipeline 的实际批处理大小将等于最大批处理大小。还支持以较小的批处理大小运行 DALI Pipeline。批处理大小可能会在迭代之间发生变化。
请注意,DALI 可能会根据该参数执行内存预分配。设置得太高可能导致内存溢出。 - num_threads (int, optional, default = -1) –
Pipeline 使用的 CPU 线程数。该参数的负值是无效的,只有在使用序列化的 pipeline 时才能使用默认值(使用序列化 pipeline 中存储的值)。 - device_id (int, optional, default = -1) –
Pipeline 使用的 GPU 的 ID。该参数的 None 值表示 DALI 不应使用 GPU 或 CUDA 运行时。这将限制 pipeline 仅使用 CPU 操作符,但允许其在任何支持 CPU 的机器上运行。 - seed (int, optional, default = -1) –
用于随机数生成的种子。留下该参数的默认值将产生随机种子。 - exec_pipelined (bool, optional, default = True) –
是否以一种使 CPU 和 GPU 计算重叠的方式执行 pipeline,通常会导致更快的执行速度,但会消耗更多内存。 - prefetch_queue_depth (int or {"cpu_size": int, "gpu_size": int}, optional, default = 2) –
执行器 pipeline 的深度。更深的 pipeline 使 DALI 更能抵抗每个批次的不均匀执行时间,但也会为内部缓冲区消耗更多内存。指定一个字典: { "cpu_size": x, "gpu_size": y } 而不是一个整数将导致 pipeline 使用分隔队列执行器,cpu 阶段的缓冲队列大小为 x,混合和 gpu 阶段的为 y。当 exec_async 和 exec_pipelined 都设置为 False 时,不支持这种用法。执行器将分别缓冲 cpu 和 gpu 阶段,并在发出第一个 run() 时填充缓冲队列。 - exec_async (bool, optional, default = True) –
是否异步执行 pipeline。这使得 run() 方法与调用 Python 线程异步运行。为了与 pipeline 同步,需要调用 outputs() 方法。 - bytes_per_sample (int, optional, default = 0) –
对 DALI 提供其张量使用多少内存的提示。 - set_affinity (bool, optional, default = False) –
是否将 CPU 核心亲和性设置为最接近正在使用的 GPU 的核心。 - max_streams (int, optional, default = -1) –
限制执行器使用的 CUDA 流的数量。-1 的值不会强加限制。当前未使用该参数(假设行为不受限制数量的流的影响)。 - default_cuda_stream_priority (int, optional, default = 0)
DALI 使用的 CUDA 流优先级。参见 CUDA 文档中的 cudaStreamCreateWithPriority。 - enable_memory_stats (bool, optional, default = 1) –
如果 DALI 应打印操作符输出缓冲区统计信息。对于 bytes_per_sample_hint 操作符参数很有用。 - py_num_workers (int, optional, default = 1) –
将处理 ExternalSource 回调的 Python 工作者数。仅当至少有一个 parallel 设置为 True 的 ExternalSource 时,池才会启动。将其设置为 0 会禁用池,并且所有 ExternalSource 操作符即使 parallel 设置为 True,也会退回到非并行模式。 - py_start_method (str, default = "fork") –
确定如何启动 Python 工作者。支持的方法:
"fork" - 通过 fork 进程启动
"spawn" - 启动一个全新的解释器进程
如果使用 spawn 方法,parallel ExternalSource 的回调必须是可 picklable 的。为了使用 fork,必须在启动工作者时没有获取 CUDA 上下文。因此,如果需要构建使用 Python 工作者的多个管道,您需要在调用任何管道的 build() 之前调用 start_py_workers()。您可以在 Python 的 multiprocessing 模块文档中找到有关两种方法的更多详细信息和注意事项。 - py_callback_pickler (module or tuple, default = None) –
如果 py_start_method 设置为 spawn,则传递给 parallel ExternalSource 的回调必须是可 picklable 的。如果在 Python3.8 或更新版本中运行,并且 py_callback_pickler 设置为 None,则 DALI 使用自定义 pickle 来序列化回调以支持本地函数和 lambda 的序列化。
但是,如果需要序列化更复杂的对象,例如本地类,或者运行较旧版本的 Python,则可以提供外部序列化包,例如 dill 或 cloudpickle,该包实现了 dumps 和 loads 两种方法,使 DALI 使用它们来序列化外部源回调。可以直接将模块传递给 py_callback_pickler:
py_callback_pickler的有效值可以是一个实现dumps和loads方法的模块/对象,或者是一个元组,其中第一个项目是模块/对象,下一个两个可选参数分别是在调用dumps和loads时要传递的额外kwargs。提供的方法和kwargs必须与标准pickle.dumps兼容。import dill @pipeline_def(py_callback_pickler=dill, ...) def create_pipeline(): src = fn.external_source(lambda sample_info: np.int32([42]), batch=False, parallel=True) ...
如果您使用Python3.8或更新版本,并且使用默认的DALI pickler(py_callback_pickler = None),您可以通过在全局函数上添加 @dali.pickling.pickle_by_value 装饰器来提示DALI按值而不是按引用序列化全局函数。当您在Jupyter笔记本中工作时,这可能尤其有用,以解决无法将在笔记本内作为全局函数定义的回调导入到工作进程的问题。
enter()¶
安全地将该管道设置为当前管道。必须将当前管道设置为调用具有副作用或没有输出的操作符。这些操作符的示例包括PythonFunction(可能有副作用)或DumpImage(无输出)。
任何悬空的操作符都可以标记为具有副作用,如果将其标记为preserve=True,则对于调试非常有用-否则不会从图中删除不会对管道输出产生影响的操作符。
要手动设置新的(并恢复之前的)当前管道,请分别使用push_current()和pop_current()。
exit(exception_type, exception_value, traceback)¶
安全地恢复之前的管道。
add_sink(edge)¶
允许手动向管道添加图边,这些图边未连接到输出并且已全部修剪。
property batch_size¶
批处理大小。
build(define_graph=None)¶
构建管道。
必须构建管道以便独立运行。特定于框架的插件会自动处理此步骤。
参数
define_graph (callable) –
如果指定了此函数,将使用此函数而不是成员define_graph()。如果使用set_outputs()指定了管道输出,或者使用了start_py_workers(),则不能设置此参数。
注意
无法与并行ExternalSource一起使用此方法定义处理图。
property cpu_queue_size¶
CPU 阶段提前处理的迭代次数。
static current()¶
返回由push_current()设置的当前管道实例。
property default_cuda_stream_priority¶
该管道使用的 CUDA 流的默认优先级。
define_graph()¶
用户定义此函数以构建其管道的操作图。
通过调用 DALI 操作符创建的输出的列表。
classmethod deserialize(serialized_pipeline=None, filename=None, **kwargs)¶
反序列化并构建管道。
反序列化先前使用serialize()方法序列化的管道。
返回的管道已经构建。
或者,可以传递额外的参数,这些参数在实例化管道时将被使用。有关参数的完整列表,请参考Pipeline构造函数。默认情况下,管道将使用序列化管道的参数实例化。
请注意,serialized_pipeline 和 filename 参数是互斥的。
参数
serialized_pipeline (str) – 使用serialize()方法序列化的管道。
filename (str) – 将从中读取序列化管道的文件。
kwargs (dict) – 有关参数的完整列表,请参考Pipeline构造函数。
返回值
Return type
反序列化和构建的管道。
deserialize_and_build(serialized_pipeline)¶
反序列化并构建以序列化形式给出的流水线。
参数
serialized_pipeline (str) – 序列化的流水线。
property device_id¶
流水线使用的 GPU 的 ID,对于仅支持 CPU 的流水线则为 None。
empty()¶
如果流水线中有任何计划但尚未被消耗的工作
enable_api_check(enable)¶
允许在运行时启用或禁用 API 检查
property enable_memory_stats¶
如果为 True,则收集内存使用统计信息。
epoch_size(name=None)¶
流水线的 epoch 大小。
如果 name 参数为 None,则返回一个键值对字典(reader 名称,该 reader 的 epoch 大小)。如果 name 参数不为 None,则返回该 reader 的 epoch 大小。
参数
name (str, 可选, 默认 = None) – 应用于获取 epoch 大小的 reader。
property exec_async¶
如果为 true,则使用异步执行。
property exec_pipelined¶
如果为 true,则使用流水线执行模型。
property exec_separated¶
如果为 True,则 CPU 和 GPU 阶段有独立的预取队列。
executor_statistics()¶
将提供的流水线执行器统计元数据作为字典返回。字典中的每个键都是操作符名称。要启用它,请使用 executor_statistics。
每个操作符的可用元数据键:
real_memory_size - 由操作符的每个输出使用的内存大小列表。列表中的索引对应于输出索引。
max_real_memory_size - 每个操作符输出使用的最大张量大小列表。列表中的索引对应于输出索引。
reserved_memory_size - 为每个操作符输出保留的内存大小列表。列表中的索引对应于输出索引。
max_reserved_memory_size - 为每个操作符输出保留的每个张量的最大内存大小列表。列表中的索引对应于输出索引。
feed_input(data_node, data, layout=None, cuda_stream=None, use_copy_kernel=False)¶
将多维数组或 DLPack(或其列表)传递给 ExternalSource 的输出。对于 GPU 输入,数据必须在与 feed_input 使用的流相同的流上进行修改。有关详情,请参阅 cuda_stream 参数。
参数
data_node (DataNode 或 str) – nvidia.dali.fn.external_source 节点的名称,或者通过调用该 ExternalSource 返回的 DataNode 对象。
data (ndarray 或 DLPack 或其列表) –
数组可以是以下之一:
NumPy ndarray(CPU)
MXNet ndarray(CPU)
PyTorch tensor(CPU 或 GPU)
CuPy array(GPU)
实现 __cuda_array_interface__ 的对象
DALI TensorList 或 DALI Tensor 对象列表
作为 data_node 所指向的 ExternalSource 的输出使用的数据。
layout (str 或 None) – 数据布局的描述(如果未指定,则为空字符串)。它应该是一个长度匹配数据维度的字符串,批处理维度除外。对于一批通道优先图像,应为“CHW”,对于通道最后的视频为“FHWC”等。如果数据是 DALI TensorList 或 DALI Tensor 对象列表,并且 layout 为 None,则从数据中获取布局。
cuda_stream(可选,cudaStream_t 或可转换为 cudaStream_t 的对象,例如 cupy.cuda.Stream、torch.cuda.Stream) –
用于将数据从 GPU 复制到 GPU 或从 GPU 源复制的 CUDA 流。如果未设置,将尽力保持正确性 - 即,如果数据以来自识别库(CuPy、PyTorch)的张量/数组形式提供,则使用库的当前流。这在典型场景中应该能够正常工作,但高级用例(以及使用不受支持的库的代码)可能仍然需要显式提供流句柄。
特殊值:
0 - 使用默认 CUDA 流
-1 - 使用 DALI 的内部流
如果使用内部流,则 feed_input 调用将阻塞,直到将数据复制到内部缓冲区完成,因为没有办法与此流同步以防止在另一个流中用新数据覆盖数组。
use_copy_kernel(可选,bool) – 如果设置为 True,则 DALI 将使用 CUDA 核心来传送数据(仅在将数据复制到/从 GPU 内存时适用),而不是使用 cudaMemcpyAsync(默认)。
property gpu_queue_size¶
GPU 阶段提前处理的迭代次数。
iter_setup()¶
用户定义的流水线可以重写此函数,以执行每次迭代所需的任何设置。例如,可以使用此函数从 NumPy 数组中提供输入数据。
property max_batch_size¶
最大批处理大小。
property max_streams¶
保留供将来使用。
property num_threads¶
该流水线使用的 CPU 线程数。
outputs()¶
返回流水线的输出并释放先前的缓冲区。
如果流水线是异步执行的,此函数会阻塞,直到结果可用。如果数据集达到其末尾,通常是在 iter_setup 无法产生更多数据时,它会引发 StopIteration。
返回
各自流水线输出的 TensorList 对象列表。
static pop_current()¶
将先前的流水线还原为当前流水线。与 push_current() 相对应。
property prefetch_queue_depth¶
预取队列的深度(或深度),如在 __init__ 参数中指定的那样。
static push_current(pipeline)¶
设置流水线为当前流水线,并将先前的当前流水线存储在堆栈上。要将先前的流水线还原为当前流水线,请使用 pop_current()。
为确保在发生异常时正确还原流水线,请使用上下文管理器(with my_pipeline:)。
当前流水线需要调用具有副作用或无输出的操作符。此类操作符的示例包括 PythonFunction(可能具有副作用)或 DumpImage(无输出)。
如果标记了具有 preserve=True 的任何悬空操作符,那么任何悬空操作符都可以标记为具有副作用,这对于调试可能很有用 - 否则,不会贡献到流水线输出的操作符将从图中移除。
property py_num_workers¶
并行 `external_source` 使用的 Python 工作进程数。
property py_start_method¶
并行 `external_source` 使用的启动 Python 工作进程的方法。
reader_meta(name=None)¶
将提供的读取器元数据作为字典返回。如果未提供名称,则为所有读取器提供数据的字典形式为 {reader_name : meta}。
可用的元数据键:
epoch_size: 原始的 epoch 大小
epoch_size_padded: 在末尾填充以使其可被分片数整除的 epoch 大小
number_of_shards: 分片数
shard_id: 给定读取器的分片 ID
pad_last_batch: 是否应填充最后一批数据
stick_to_shard: 是否应使读取器与其分片保持一致
参数:name (str, 可选, 默认 = None) – 用于获取分片数的读取器。
release_outputs()¶
释放通过 share_outputs 调用返回的缓冲区。
当输出调用结果被消耗(复制)并且在下一次调用 share_outputs 之前可以将缓冲区标记为自由时,它会有所帮助。它为用户提供了更好的控制,可以确定何时运行管道、何时获取结果缓冲区以及在结果被消耗后何时将它们返回到 DALI 池中。需要与 schedule_run() 和 share_outputs() 一起使用。不应该与同一管道中的 run() 混合使用。
reset()¶
重置管道迭代器。
如果管道迭代器达到末尾,则将其状态重置为开头。
run()¶
运行管道并返回结果。
如果使用 exec_pipelined 选项设置为 True 创建了管道,则该函数还将启动预取下一次迭代,以加快执行速度。不应该与同一管道中的 schedule_run()、share_outputs() 和 release_outputs() 混合使用。
返回
适用于相应管道输出的 TensorList 对象列表
save_graph_to_dot_file(filename, show_tensors=False, show_ids=False, use_colors=False)¶
将管道图保存到文件中。
参数
filename (str) – 要将图写入的文件的名称。
show_tensors (bool) – 在图中显示 Tensor 节点(默认情况下只显示操作符节点)
show_ids (bool) – 将节点 ID 添加到图表示中
use_colors (bool) – 是否使用颜色来区分阶段
schedule_run()¶
运行管道而不返回结果缓冲区。
如果使用 exec_pipelined 选项设置为 True 创建了管道,则该函数还将启动预取下一次迭代,以加快执行速度。它为用户提供了更好的控制,可以确定何时运行管道、何时获取结果缓冲区以及在结果被消耗后何时将它们返回到 DALI 缓冲池中。需要与 release_outputs() 和 share_outputs() 一起使用。不应该与同一管道中的 run() 混合使用。
property seed¶
管道中使用的随机种子,如果种子没有固定,则为 None。
serialize(define_graph=None, filename=None)¶
将管道序列化为 Protobuf 字符串。
此外,您可以传递文件名,这样序列化的管道将被写入其中。文件内容将被覆盖。
参数
define_graph (callable) – 如果指定了此函数,将使用此函数而不是成员 define_graph()。如果使用 set_outputs() 指定了管道输出,则不能设置此参数。
filename (str) – 序列化管道将写入的文件。
kwargs (dict) – 有关参数的完整列表,请参阅 Pipeline 构造函数。
property set_affinity¶
如果为 True,则工作线程绑定到 CPU 核心。
set_outputs(*output_data_nodes)¶
设置管道的输出。
使用此函数是在派生类中覆盖 define_graph 的另一种选择。
参数
*output_data_nodes (未打包的 DataNode 对象列表) – 管道的输出
share_outputs()¶
返回管道的输出。
与 outputs() 的主要区别在于 share_outputs 不会释放返回的缓冲区,需要调用 release_outputs 来实现。如果管道异步执行,则此函数将阻塞,直到结果变为可用。它为用户提供了更好的控制,可以确定何时运行管道、何时获取结果缓冲区以及在结果被消耗后何时将它们返回到 DALI 池中。需要与 release_outputs() 和 schedule_run() 一起使用。不应该与同一管道中的 run() 混合使用。
返回
适用于相应管道输出的 TensorList 对象列表
start_py_workers()¶
启动 Python 工作线程(将运行 ExternalSource 回调)。当使用 fork 启动 Python 工作线程(py_start_method="fork")时,您需要在调用任何创建或获取 CUDA 上下文的功能之前调用 start_py_workers()。当不需要这种分离时,Pipeline.build() 方法会自动调用它。
如果您将要构建多个通过 fork 进程启动 Python 工作线程的管道,则需要在调用任何管道的 build() 方法之前在所有这些管道上调用 start_py_workers() 方法,因为 build 会为当前进程获取 CUDA 上下文。
对于使用任何其他可能创建 CUDA 上下文的功能也是如此——例如,初始化使用 CUDA 的框架或使用它创建 CUDA 张量。当使用 py_start_method="fork" 时,在调用此类功能之前需要调用 start_py_workers()。
不支持在具有 CUDA 上下文的进程中使用 fork。这可能会导致意外错误。
如果使用该方法,调用 build() 时不能指定 define_graph 参数。
Pipeline装饰器
@nvidia.dali.pipeline_def(fn=None, **pipeline_kwargs)¶
这是一个将图定义函数转换为DALI管道工厂的装饰器。
图定义函数是一个返回预期管道输出的函数。您可以使用@pipeline_def装饰这个函数:
@pipeline_def
def my_pipe(flip_vertical, flip_horizontal):
''' Creates a DALI pipeline, which returns flipped and original images '''
data, _ = fn.readers.file(file_root=images_dir)
img = fn.decoders.image(data, device="mixed")
flipped = fn.flip(img, horizontal=flip_horizontal, vertical=flip_vertical)
return flipped, img
被装饰的函数返回一个DALI Pipeline对象:
pipe = my_pipe(True, False)
# pipe.build() # the pipeline is not configured properly yet
一个管道需要额外的参数,比如批量大小、工作线程数、GPU设备ID等(请参阅nvidia.dali.Pipeline()获取完整的管道参数列表)。这些参数可以作为额外的关键字参数提供,传递给被装饰的函数:
pipe = my_pipe(True, False, batch_size=32, num_threads=1, device_id=0)
pipe.build() # the pipeline is properly configured, we can build it now
原始函数的输出成为Pipeline的输出:
flipped, img = pipe.run()
当一些管道参数是固定的时,它们可以在装饰器中通过名称指定:
@pipeline_def(batch_size=42, num_threads=3)
def my_pipe(flip_vertical, flip_horizontal):
...
在调用被装饰的函数时传递的任何Pipeline构造函数参数都会覆盖装饰器中定义的参数:
@pipeline_def(batch_size=32, num_threads=3)
def my_pipe():
data = fn.external_source(source=my_generator)
return data
pipe = my_pipe(batch_size=128) # batch_size=128 overrides batch_size=32
警告
被装饰的函数的参数可能会覆盖管道构造函数的参数,这样就无法更改它们的值。
注意
在定义图形的函数中使用**kwargs(可变关键字参数)是不允许的。它们可能导致Pipeline构造函数对同名参数进行意外且静默的劫持。以这种方式编写的代码在将来DALI版本中添加新参数到Pipeline构造函数时将无法工作。
要在@pipeline_def函数体内访问任何管道参数,可以使用函数nvidia.dali.Pipeline.current()。
@pipeline_def()
def my_pipe():
pipe = Pipeline.current()
batch_size = pipe.batch_size
num_threads = pipe.num_threads
...
pipe = my_pipe(batch_size=42, num_threads=3)
...
数据节点
class nvidia.dali.pipeline.DataNode(name, device='cpu', source=None)¶
这个类是对TensorList的符号表示,用于图定义阶段。它不携带实际数据,而是用于定义运算符之间的连接并指定管道输出。详细信息请参阅Pipeline的文档。
DataNode对象可以作为输入传递给DALI运算符(以及一些命名关键字参数),但它们也提供了算术操作,隐式创建执行表达式的适当运算符。
Pipeline调试模式(实验性)
通过将@nvidia.dali.pipeline_def装饰器替换为其实验性变体@nvidia.dali.experimental.pipeline_def,并将debug参数设置为True,可以在调试模式下运行Pipeline。这允许您访问和修改管道执行图中的数据,以及使用非DALI数据类型作为DALI运算符的输入。
在此模式下,运算符的输出类型为DataNodeDebug,它相当于标准模式中的DataNode。您可以对DataNodeDebug类型的对象执行与DataNode相同的操作,包括算术操作。
使用.get()来访问与DataNodeDebug对象关联的数据,在Pipeline.run()的当前执行期间:
@nvidia.dali.experimental.pipeline_def(debug=True)
def my_pipe():
data, _ = fn.readers.file(file_root=images_dir)
img = fn.decoders.image(data)
print(np.array(img.get()[0]))
...
直接使用非DALI数据类型(例如NumPy数组、PyTorch张量)与DALI运算符一起使用:
@nvidia.dali.experimental.pipeline_def(batch_size=8, debug=True)
def my_pipe():
img = [np.random.rand(640, 480, 3) for _ in range(8)]
output = fn.flip(img)
...
注意:
调试模式中的种子生成方式与标准模式不同(它是确定性的,但不同)。如果您希望在调试模式下获得与标准模式相同的结果,请使用种子参数初始化运算符。
直接调用运算符仅适用于pipeline_def函数的范围内,在pipeline_def之外无法以这种方式使用它们。
您不能在迭代之间更改管道内运算符的顺序。
警告:
使用调试模式会严重降低管道的性能。仅在调试目的下使用。
注意:
此功能是实验性的,其API可能会在没有事先通知的情况下发生更改。