分布式执行引擎ray入门--(2)Ray Data

目录

一、overview

基础代码

核心API:

二、核心概念

2.1 加载数据

从S3上读

从本地读: 

 其他读取方式

  读取分布式数据(spark)

 从ML libraries 库中读取(不支持并行读取)

从sql中读取

2.2 变换数据

map

flat_map

Transforming batches

Shuffling rows

Repartitioning data

2.3 消费数据

1) 按行遍历

2)按batch遍历

3)遍历batch时shuffle

4)为分布式并行训练分割数据

2.4 保存数据

保存文件

修改分区数

将数据转换为python对象

将数据转换为分布式数据(spark)


今天来带大家一起来学习下ray中对数据的操作,还是非常简洁的。

一、overview

 

基础代码

from typing import Dict
import numpy as np
import ray

# Create datasets from on-disk files, Python objects, and cloud storage like S3.
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

# Apply functions to transform data. Ray Data executes transformations in parallel.
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    length = batch["petal length (cm)"]
    width = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = length * width
    return batch

transformed_ds = ds.map_batches(compute_area)

# Iterate over batches of data.
for batch in transformed_ds.iter_batches(batch_size=4):
    print(batch)

# Save dataset contents to on-disk files or cloud storage.
transformed_ds.write_parquet("local:///tmp/iris/")

使用ray.data可以方便地从硬盘、python对象、S3上读取文件

最后写入云端

核心API:

  • 简单变换(map_batches())

  • 全局聚合和分组聚合(groupby())

  • Shuffling 操作 (random_shuffle(), sort(), repartition()).

二、核心概念

2.1 加载数据

  • 从S3上读

import ray

#加载csv文件
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
print(ds.schema())
ds.show(limit=1)

#加载parquet文件
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")

#加载image
ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea/JPEGImages/")

# Text
ds = ray.data.read_text("s3://anonymous@ray-example-data/this.txt")

# binary
ds = ray.data.read_binary_files("s3://anonymous@ray-example-data/documents")

#tfrecords
ds = ray.data.read_tfrecords("s3://anonymous@ray-example-data/iris.tfrecords")
  • 从本地读: 

ds = ray.data.read_parquet("local:///tmp/iris.parquet")
  • 处理压缩文件
ds = ray.data.read_csv(
    "s3://anonymous@ray-example-data/iris.csv.gz",
    arrow_open_stream_args={"compression": "gzip"},
)
  •  其他读取方式

import ray

# 从python对象里获取
ds = ray.data.from_items([
    {"food": "spam", "price": 9.34},
    {"food": "ham", "price": 5.37},
    {"food": "eggs", "price": 0.94}
])


ds = ray.data.from_items([1, 2, 3, 4, 5])

# 从numpy里获取
array = np.ones((3, 2, 2))
ds = ray.data.from_numpy(array)

# 从pandas里获取
df = pd.DataFrame({
    "food": ["spam", "ham", "eggs"],
    "price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_pandas(df)

# 从py arrow里获取

table = pa.table({
    "food": ["spam", "ham", "eggs"],
    "price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_arrow(table)

  •   读取分布式数据(spark)

import ray
import raydp

spark = raydp.init_spark(app_name="Spark -> Datasets Example",
                        num_executors=2,
                        executor_cores=2,
                        executor_memory="500MB")
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
ds = ray.data.from_spark(df)

ds.show(3)

 从ML libraries 库中读取(不支持并行读取)

import ray.data
from datasets import load_dataset

# 从huggingface里读取(不支持并行读取)
hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_ds = ray.data.from_huggingface(hf_ds["train"])
ray_ds.take(2)


# 从TensorFlow中读取(不支持并行读取)
import ray
import tensorflow_datasets as tfds

tf_ds, _ = tfds.load("cifar10", split=["train", "test"])
ds = ray.data.from_tf(tf_ds)

print(ds)

从sql中读取

import mysql.connector

import ray

def create_connection():
    return mysql.connector.connect(
        user="admin",
        password=...,
        host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
        connection_timeout=30,
        database="example",
    )

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

Ray还支持从BigQuery和MongoDB中读取,篇幅问题,不赘述了。

2.2 变换数据

变换默认是lazy,直到遍历、保存、检视数据集时才执行

map

import os
from typing import Any, Dict
import ray

def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
    row["filename"] = os.path.basename(row["path"])
    return row

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
    .map(parse_filename)
)


flat_map

from typing import Any, Dict, List
import ray

def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    return [row] * 2

print(
    ray.data.range(3)
    .flat_map(duplicate_row)
    .take_all()
)

# 结果:
# [{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]
# 原先的元素都变成2个

Transforming batches

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = np.clip(batch["image"] + 4, 0, 255)
    return batch


# batch_format:指定batch类型,可不加
ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .map_batches(increase_brightness, batch_format="numpy")
)

如果初始化较贵,使用类而不是函数,这样每次调用类的时候,进行初始化。类有状态,而函数没有状态。

并行度可以指定(min,max)来自由调整

Shuffling rows

import ray

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .random_shuffle()
)

Repartitioning data

import ray

ds = ray.data.range(10000, parallelism=1000)

# Repartition the data into 100 blocks. Since shuffle=False, Ray Data will minimize
# data movement during this operation by merging adjacent blocks.
ds = ds.repartition(100, shuffle=False).materialize()

# Repartition the data into 200 blocks, and force a full data shuffle.
# This operation will be more expensive
ds = ds.repartition(200, shuffle=True).materialize()

2.3 消费数据

1) 按行遍历

import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

for row in ds.iter_rows():
    print(row)

2)按batch遍历

numpy、pandas、torch、tf使用不同的API遍历batch

# numpy
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_batches(batch_size=2, batch_format="numpy"):
    print(batch)


# pandas
import ray
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
for batch in ds.iter_batches(batch_size=2, batch_format="pandas"):
    print(batch)


# torch
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_torch_batches(batch_size=2):
    print(batch)


# tf
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

tf_dataset = ds.to_tf(
    feature_columns="sepal length (cm)",
    label_columns="target",
    batch_size=2
)
for features, labels in tf_dataset:
    print(features, labels)

3)遍历batch时shuffle

只需要在遍历batch时增加local_shuffle_buffer_size参数即可。

非全局洗牌,但性能更好。

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_batches(
    batch_size=2,
    batch_format="numpy",
    local_shuffle_buffer_size=250,
):
    print(batch)

4)为分布式并行训练分割数据

import ray

@ray.remote
class Worker:

    def train(self, data_iterator):
        for batch in data_iterator.iter_batches(batch_size=8):
            pass

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
workers = [Worker.remote() for _ in range(4)]
shards = ds.streaming_split(n=4, equal=True)
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])

2.4 保存数据

保存文件

非常类似pandas保存文件,唯一的区别保存本地文件时需要加入local://前缀。

注意:如果不加local://前缀,ray则会将不同分区的数据写在不同节点上

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

# local
ds.write_parquet("local:///tmp/iris/")

# s3
ds.write_parquet("s3://my-bucket/my-folder")

修改分区数

import os
import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.repartition(2).write_csv("/tmp/two_files/")

print(os.listdir("/tmp/two_files/"))

将数据转换为python对象

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

df = ds.to_pandas()
print(df)

将数据转换为分布式数据(spark)

import ray
import raydp

spark = raydp.init_spark(
    app_name = "example",
    num_executors = 1,
    executor_cores = 4,
    executor_memory = "512M"
)

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
df = ds.to_spark(spark)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/445593.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

html--彩虹马

文章目录 htmljscss 效果 html <!DOCTYPE html> <html lang"en" > <head> <meta charset"UTF-8"> <title>Rainbow Space Unicorn</title> <link rel"stylesheet" href"css/style.css"> &l…

TCP/IP 七层架构模型

传输控制协议&#xff08;TCP&#xff0c;Transmission Control Protocol&#xff09;是一种面向连接的、可靠的、基于字节流的传输层通信协议。 套接字&#xff08;socket&#xff09;是一个抽象层&#xff0c;应用程序可以通过它发送或接收数据&#xff0c;可对其进行像对文…

【Linux】常用操作命令

目录 基本命令关机和重启帮助命令 用户管理命令添加用户&#xff1a;useradd 命令修改密码&#xff1a;passwd 命令查看登录用户&#xff1a;who 命令查看登录用户详细信息 :w切换用户 目录操作命令cdpwd命令目录查看 ls [-al] 目录操作【增&#xff0c;删&#xff0c;改&#…

NUMA(Non-Uniform Memory Access)架构的介绍

1. NUMA由来 最早的CPU是以下面这种形式访问内存的&#xff1a; 在这种架构中&#xff0c;所有的CPU都是通过一条总线来访问内存&#xff0c;我们把这种架构叫做SMP架构&#xff08;Symmetric Multi-Processor&#xff09;&#xff0c;也就是对称多处理器结构。可以看出来&…

Uniapp开发模板unibest

&#x1f3e0;简介 unibest 是一个集成了多种工具和技术的 uniapp 开发模板&#xff0c;由 uniapp Vue3 Ts Vite4 UnoCss uv-ui VSCode 构建&#xff0c;模板具有代码提示、自动格式化、统一配置、代码片段等功能&#xff0c;并内置了许多常用的基本组件和基本功能&#…

【PowerMockito:编写单元测试过程中原方法使用@Value注解注入的属性出现空指针】

错误场景 执行到Value的属性时会出现空指针&#xff0c;因为Value的属性为null 解决方法 在测试类调用被测试方法前&#xff0c;提前设置属性值&#xff0c;属性可以先自己定义好 ReflectionTestUtils.setField(endpointConnectionService, "exportUdpList", lis…

Linux 之七:Linux 防火墙 和进程管理

防火墙 查看防火墙 查看 Centos7 的防火墙的状态 sudo systemctl status firewalld。 查看后&#xff0c;看到active(running)就意味着防火墙打开了。 关闭防火墙&#xff0c;命令为&#xff1a; sudo systemctl stop firewalld。 关闭后查看是否关闭成功&#xff0c;如果…

【机器学习】一文掌握逻辑回归全部核心点(上)。

逻辑回归核心点-上 1、引言2、逻辑回归核心点2.1 定义与目的2.2 模型原理2.2.1 定义解析2.2.2 公式2.2.3 代码示例 2.3 损失函数与优化2.3.1 定义解析2.3.2 公式2.3.3 代码示例 2.4 正则化2.4.1 分类2.4.2 L1正则化2.4.3 L2正则化2.4.4 代码示例 3、总结 1、引言 小屌丝&#…

从空白镜像创建Docker hello world

文章目录 写在前面基础知识方法一&#xff1a;使用echo工具方法二&#xff0c;使用c语言程序方法三&#xff0c;使用汇编语言小结 写在前面 尝试搞了下docker&#xff0c;网上的教程大多是让下载一个ubuntu这种完整镜像&#xff0c;寥寥几篇从空白镜像开始创建的&#xff0c;也…

Oracle VM VirtualBox安装Ubuntu桌面版

背景&#xff1a;学习Docker操作 虚拟机软件&#xff1a;Oracle VM VirtualBox 7.0 系统镜像&#xff1a;ubuntu-20.04.6-desktop-amd64.iso 在Oracle VM VirtualBox新建一个虚拟电脑 选择好安装的目录和选择系统环境镜像 设置好自定义的用户名、密码、主机名 选择一下运行内…

执行除法运算返回浮点数结果operator.truediv()返回商的整数部分operator.floordiv()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 执行除法运算 返回浮点数结果 operator.truediv() 返回商的整数部分 operator.floordiv() 下列选项可以执行除法运算并得到浮点数结果的是&#xff08;&#xff09; import operator print(&…

凌鲨微应用架构

微应用是静态网页加上凌鲨提供的扩展能力而形成的一种应用&#xff0c;主要特点是开发便捷&#xff0c;安全。 微应用架构 组件说明 名称 说明 微应用 webview窗口&#xff0c;显示web服务器上的页面 接口过滤器 根据权限配置,屏蔽非授权接口访问 接口提供者 tauri注入…

文件操作上(c语言)

目录 1. 文件的作用2. 什么是文件2.1 程序文件2.2 数据文件2.3 文件名 3. 二进制文件和文本文件4. 文件的打开和关闭4.1 流和标准流4.1.1 流4.1.2 标准流 4.2 文件指针4.3 文件的打开与关闭4.3.1 文件的打开模式4.3.2 实例代码 1. 文件的作用 使用文件可以将数据进行持久化的保…

P1958 上学路线

难度&#xff1a;普及- 题目描述 你所在城市的街道好像一个棋盘&#xff0c;有 a 条南北方向的街道和 b 条东西方向的街道。南北方向的 a 条街道从西到东依次编号为 1 到 a&#xff0c;而东西方向的 b 条街道从南到北依次编号为 1 到 b&#xff0c;南北方向的街道 i 和东西方…

【期刊】ACM Transactions on Privacy and Security

首页截图 subject areas 混合模式 根据官网介绍&#xff0c;本期刊不在金OA行列&#xff0c;可以自主选择出版模式。 出版方向 Topics of interest include 发文量 季刊&#xff0c;发文量很小 图像安全领域 未在今年发表图像安全领域论文。

ARM基础----STM32处理器操作模式

STM32处理器操作模式 Cortex-M处理器操作模式、特权等级和栈指针操作模式栈指针CONTROL寄存器异常压栈时的SP指针 Cortex-A 处理器运行模型寄存器组 Cortex-M处理器操作模式、特权等级和栈指针 操作模式 处理模式&#xff1a;执行中断服务程序等异常处理&#xff0c;处理器具有…

SpringBoot自定义banner,自定义logo

SpringBoot自定义banner&#xff0c;自定义logo 在线网站 http://www.network-science.de/ascii/?spma2c6h.12873639.article-detail.9.7acc2c9aSTnQdW https://www.bootschool.net/ascii?spma2c6h.12873639.article-detail.8.7acc2c9aSTnQdW https://patorjk.com/softwa…

继承杂谈。

内容一览 前言继承的概念及定义继承的意义继承关系及访问限定符父类和子类对象之间的转化继承后的作用域继承与有元继承与静态成员多继承继承和组合的区别&#xff1a;继承的总结和反思 前言 面向对象的三大特性&#xff1a;封装继承和多态&#xff0c;这三种特性优者很紧密地联…

基于神经网络的偏微分方程求解器再度取得突破,北大字节的研究成果入选Nature子刊

目录 一.引言:神经网络与偏微分方程 二.如何基于神经网络求解偏微分方程 1.简要概述 2.基于神经网络求解偏微分方程的三大方向 2.1数据驱动 基于CNN 基于其他网络 2.2物理约束 PINN 基于 PINN 可测量标签数据 2.3物理驱动(纯物理约束) 全连接神经网路(FC-NN) CN…

C++特殊类设计【特殊类 || 单例对象 || 饿汉模式 || 懒汉模式】

目录 一&#xff0c;特殊类设计 1. 只在堆上创建的类 2. 只允许在栈上创建的类 3. 不能被继承的类 4. 不能被拷贝的类 5. 设计一个类&#xff0c;只能创建一个对象&#xff08;单例对象&#xff09; 饿汉模式 懒汉模式 C11静态成员初始化多线程安全问题 二&#xff…