飞书API(11):阿里云MaxCompute分区表入库

一、引入

前面入库阿里云 MaxCompute 的数据都是读取之后直接写入,保留数据最新的状态,如果我要保留历史的状态,怎么办呢?
MaxCompute 表有一个分区功能,可以自行定义分区。我们可以使用 MaxCompute 表的分区功能,每天写入一个分区,保留每天每条数据的状态。
表单的分区功能,除了存储每天数据明细信息,也可以根据数据中的某个维度做分区。
本文重点来探讨下怎么建分区表以及怎么写入分区表。

二、MaxCompute 分区表

什么是分区?
分区表是指拥有分区空间的表,可以理解为分类,通过分类把不同类型的数据放到不同的目录下。分类的标准就是分区字段,可以是一个,也可以是多个。
MaxCompute 将分区列的每个值作为一个分区(目录),多级分区则是将表的多个字段作为表的分区。
分区其实就像我们常用的文件夹的概念,一级分区类比根目录,二级分区则是根目录下的子文件夹,然后数据就是子文件夹下的某个文件,事实上,底层也是这么存储数据的。

分区表的意义在于优化查询。查询表时通过WHERE子句查询指定所需查询的分区,避免全表扫描,提高处理效率,降低计算费用。使用数据时,如果指定需要访问的分区名称,则只会读取相应的分区。所以合理设计和使用分区,可以提高查询性能、简化数据管理,并支持更灵活的数据访问和操作。
多级分区结构参考:
image.png

2.1 创建分区表

创建分区表时,给分区的参数传递值即可。下面通过两种方法展开。

方法一:通过 Schema 创建分区表。
调用阿里云odps.models中的Partition,传递nametypecomment三个参数,创建一个分区对象,然后放到列表中,传递给Schema即可。
由于后面涉及定制化需要重新创建 schema,所以这里把partitions也返回,以便传递给后续的定制函数。

from odps.models import Schema, Column, Partition
def generate_create_schema(store_fields_info_df):
    columns = []
    columns_index = {}
    for index, row in store_fields_info_df.iterrows():
        name = row['tb_field_name']; type=row['mc_type']; comment=row['feishu_field_name']
        # print(name,type,comment)
        columns.append(Column(name=name, type=type, comment=comment))
        columns_index[name] = (index, comment)
    columns.append(Column(name='record_id', type='string', comment='飞书表行record_id'))
    columns.append(Column(name='last_modified_time', type='datetime', comment='数据更新时间'))
    partitions = [Partition(name='pt', type='string', comment='分区字段')]
    schema = Schema(columns=columns, partitions=partitions)
    print(f'成功生成 MaxCompute 建表 schema。关联方法:generate_create_schema。')
    return schema, columns, columns_index, partitions

def cre_mc_table(db_table_name, schema):
    if o.exist_table(db_table_name):
        print(f'表单 {db_table_name} 已存在,不需要新建。关联方法:cre_mc_table。')
    else:
        table = o.create_table(db_table_name, schema, if_not_exists=True)
        print(f'成功创建 MaxCompute 表:{db_table_name}。关联方法:cre_mc_table。')

方法二:通过类建表 DDL 创建分区表。
这个方法比较简单,以 SQL 语法的形式写一个字段的信息,比如pt string comment "分区字段",然后直接传递给阿里云的建表函数o.create_table()即可。

def generate_create_ddl(store_fields_info_df):
    cre_ddl = ''
    for index, row in store_fields_info_df.iterrows():
        cre_ddl += f"{row['tb_field_name']} {row['mc_type']} comment '{row['feishu_field_name']}',"
    default_fields = "record_id string comment '飞书表行record_id',last_modified_time datetime comment '数据更新时间'"
    cre_ddl = cre_ddl + default_fields
    print(f'成功生成 MaxCompute 类建表 DDL。关联方法:generate_create_ddl。')
    return cre_ddl

def cre_mc_table(db_table_name, cre_ddl):
    if o.exist_table(db_table_name):
        print(f'表单 {db_table_name} 已存在,不需要新建。关联方法:cre_mysql_table。')
    else:
        table = o.create_table(db_table_name, (cre_ddl, 'pt string comment "分区字段" '), if_not_exists=True)
        print(f'创建 MaxCompute 表:{db_table_name}。关联方法:cre_mc_table。')

2.2 写入分区表

将数据写入分区表,需要额外指定分区字段的值。分区表的分区字段的值可以指定固定值,也可以使用处理结果中的某一列。此处直接使用 T-1 的日期,格式为:yyyymmdd。
这个指定的固定值可以使用datetime模块计算出来,也可以使用 DataWorks 的调度参数。
直接计算出来可以参考以下代码,其中datetime.datetime.today()能够获取到今天的日期,而datetime.timedelta(days=1)可以得到 1 天的时间间隔,将今天的日期减去 1 天便可得到昨天的日期,然后使用strftime('%Y%m%d')进行格式化便得到了最终的日期格式。

import datetime
pt_date = (datetime.datetime.today() - datetime.timedelta(days=1)).strftime('%Y%m%d')

使用 DataWorks 的调度参数则需要在右边侧边栏的属性里找到调度参数,新增一对参数,这里我把参数名定位仪为pt_date,参数值设置为$bizdate。参考如下:
image.png

设置完参数可以点击【调度参数预览】检查是否有问题,结果如下,结果是 T-1,且格式为:yyyymmdd,符合预期。
image.png

参数配置好了,那怎么使用呢?
阿里云提供了一个全局变量:args,可以直接使用args['ptdate']得到刚刚设置的ptdate参数的值。

了解了基本的信息之后,便可以进行开发,这里介绍两种方法:转为列表直接写入和通过 PyODPS 的 DataFrame 做中转再写入。

2.2.1 方法一:转为列表直接写入表。

直接在open_writer()传递参数值给partition参数即可,参数值的两种获取方式任选一种即可。特别注意的一点是要加上参数create_partition=True,即当分区不存在的时候创建分区。

import math

# 需要处理 nan 值
def replace_nan_with_none(data):
    """递归地将列表中的nan替换为None"""
    # math.isnan(df.values.tolist()[1][2])
    if isinstance(data, list):
        return [replace_nan_with_none(item) for item in data]
    elif isinstance(data, float) and math.isnan(data):
        return None
    else:
        return data
        
def insert_mc_table(feishu_df, table_name):
    """
    Maxcompute 插入数据
    """
    feishu_data_ary = feishu_df.values.tolist()
    feishu_data_ary = replace_nan_with_none(feishu_data_ary)
    # print(feishu_data_ary)
    t = o.get_table(table_name)
    # # 参数值获取1:使用 datetime 模块获取
    # ptdate = (datetime.datetime.today()- datetime.timedelta(days=1)).strftime('%Y%m%d')  #昨天
    # 参数值获取2:使用 DataWorks 调度参数获取
    ptdate = args['ptdate']
    with t.open_writer(partition=f"pt={ptdate}", create_partition=True) as writer:
        writer.write(feishu_data_ary)  
    print(f'成功将飞书数据写入 MaxCompute 数据表:{table_name}。关联方法:insert_mc_table。')

2.2.2 方法二:转为 PyODPS DataFrame 再写入表。

直接在persist()传递参数值给partition参数即可,参数值的两种获取方式任选一种即可。特别注意的一点是要加上参数create_partition=True,即当分区不存在的时候创建分区。

def insert_mc_table(feishu_df, mc_table_name, mc_df_astype):
    """
    生成 PyODPS DataFrame 并将数据插入 Maxcompute 表
    """
    aliyun_df = DataFrame(feishu_df,as_type=mc_df_astype)
    # # 参数值获取1:使用 datetime 模块获取
    # ptdate = (datetime.datetime.today()- datetime.timedelta(days=1)).strftime('%Y%m%d')  #昨天
    # 参数值获取2:使用 DataWorks 调度参数获取
    ptdate = args['ptdate']
    aliyun_df.persist(mc_table_name, partition=f"pt={ptdate}", create_partition=True)    
    print(f'成功将飞书数据写入 MaxCompute 数据表:{mc_table_name}。关联方法:insert_mc_table。')

另外,一个数据出入常见的问题:如果执行多次是在原分区追加插入,还是覆盖插入,或是删除原分区重新新建分区插入?
使用 列表插入,即使用write_table()方法插入,在写入数据时会追加到原有数据中,PyODPS 不提供覆盖数据的选项,如果需要覆盖数据,需要手动清除原有数据。对于非分区表,可以调用truncate()方法;对于分区表,需要删除分区后再建立新的分区。
而转为 PyODPS 的 DataFrame 再插入数据,阿里云提供了drop_partitioncreate_partition参数,当分区存在时,先将分区删除,然后再创建新的同名(参数值)分区。参考代码如下,即在上面的代码的persist()方法多加上一对参数drop_partition=True

df.persist('table_name', partition='ds=test', drop_partition=True, create_partition=True)

2.3 迭代定制化函数

前面介绍了两种建表方式和两种写表方式,再进行定制化的时候,需要修改建表的逻辑和写表的逻辑,建表和写表的方法两两结合可以有四种组合,下面拿其中两种展开阐述,分别是 Schema + DataFrame 和 类 DDL+列表。

2.3.1 Schema + DataFrame

当需要局部进行定制化迭代的时候,如果是使用 Schema 需要重新生成,使用分区表时,需要多传递分区信息,此处将前面创建表单的partitions传递到函数中,并用于最终重新生成 schema。
当使用 PyODPS 的 DataFrame 中转将数据入库的时候,还需要处理 DataFrame 的 astype,值得注意的时,DataFrame 中没有 date 类型,所以时间列转为日期不需要修改 astype。

def custom_field(df_return, columns, columns_index, mc_df_astype, partitions):
    # 2.1 场景一:把数字入库为 int 类型
    # 修改 SQL 即可
    # cre_ddl = cre_ddl.replace('field_number float','field_number int')
    column = Column(name='field_number', type='bigint', comment=columns_index['field_number'][1])
    columns[columns_index['field_number'][0]] = column
    #修改mc_df_astype
    mc_df_astype['field_number'] = 'int64'
    
    # 2.2 场景二:把日期入库为 date 类型
    # 修改 df,MySQL会自动截断,Maxcompute不行,需要使用 x.date() 处理
    df_return['field_createdtime'] = df_return['field_createdtime'].apply(lambda x:x.date())
    # 修改 SQL
    # cre_ddl = cre_ddl.replace('field_createdtime datetime','field_createdtime date')
    column = Column(name='field_createdtime', type='date', comment=columns_index['field_createdtime'][1])
    columns[columns_index['field_createdtime'][0]] = column
    # #修改mc_df_astype
    # mc_df_astype['field_createdtime'] = 'date'
    
    # 2.3 场景三:日期给定默认最大值
    # 修改 df 即可
    #默认值改为 2222-01-01 00:00:00
    mask = df_return['field_date'] == pd.Timestamp('1970-01-01 08:00:01')
    df_return.loc[mask, 'field_date'] = pd.Timestamp('2222-01-01 00:00:00')
    
    # 2.4 场景四:公式保留具体值
    # 修改 df
    # 修改 SQL
    df_return['field_numformula'] = df_return['field_numformula'].apply(lambda x:json.loads(x)['value'][0])
    # cre_ddl = cre_ddl.replace('field_numformula varchar(256)','field_numformula int')
    column = Column(name='field_numformula', type='bigint', comment=columns_index['field_numformula'][1])
    columns[columns_index['field_numformula'][0]] = column
    #修改mc_df_astype
    mc_df_astype['field_numformula'] = 'int64'

    # 创建新的 schema
    schema = Schema(columns=columns, partitions=partitions)
    print('定制函数打印数据和建表语句')
    print('----------------------------------------------\n', df_return[['field_number','field_createdtime','field_date','field_numformula']].head(5))
    print('----------------------------------------------\n', schema.columns)
    return df_return, schema, mc_df_astype

2.3.2 类 DDL+列表

在定制化某些列信息时,如果是使用类 建表的 DDL 语句,根据建表的规则,识别对应的字符串进行替换即可。使用列表写入数据则不需要进行修改。


def custom_field(df_return, cre_ddl):
    # 2.1 场景一:把数字入库为 int 类型
    # 修改 SQL 即可
    cre_ddl = cre_ddl.replace('field_number float','field_number int')
    
    # 2.2 场景二:把日期入库为 date 类型
    # 修改 df,MySQL会自动截断,Maxcompute不行,需要使用 x.date() 处理
    df_return['field_createdtime'] = df_return['field_createdtime'].apply(lambda x:x.date())
    # 修改 SQL
    cre_ddl = cre_ddl.replace('field_createdtime datetime','field_createdtime date')

    # 2.3 场景三:日期给定默认最大值
    # 修改 df 即可
    #默认值改为 2222-01-01 00:00:00
    mask = df_return['field_date'] == pd.Timestamp('1970-01-01 08:00:01')
    df_return.loc[mask, 'field_date'] = pd.Timestamp('2222-01-01 00:00:00')
    
    # 2.4 场景四:公式保留具体值
    # 修改 df
    # 修改 SQL
    df_return['field_numformula'] = df_return['field_numformula'].apply(lambda x:json.loads(x)['value'][0])
    cre_ddl = cre_ddl.replace('field_numformula varchar(256)','field_numformula int')

    print('定制函数打印数据和建表语句')
    print('----------------------------------------------\n', df_return[['field_number','field_createdtime','field_date','field_numformula']].head(5))
    print('----------------------------------------------\n', schema.columns)
    return df_return, cre_ddl

三、整合代码(Schema+DataFrame)

上面介绍了两种建表方式和两种写入方式,可以有4种搭配,这里提供 schema+DataFrame 的,即在上一小节最终的代码的基础上在generate_create_schema()custom_field()insert_mc_table()三个函数中都加上分区字段即可。
两点说明:

  • 该代码加上分区处理逻辑:当分区存在时,删除重建,确保重复跑数据不会重复。
  • 该代码使用了 DataWorks 的调度参数,所以使用时需要配置调度参数:pt_date=$bizdate

import requests
import json
import datetime
import pandas as pd
from sqlalchemy import create_engine, text
from urllib.parse import urlparse, parse_qs
from odps.models import Schema, Column, Partition
import math

def get_table_params(bitable_url):
    # bitable_url = "https://feishu.cn/base/aaaaaaaa?table=tblccc&view=vewddd"
    parsed_url = urlparse(bitable_url)              #解析url:(ParseResult(scheme='https', netloc='feishu.cn', path='/base/aaaaaaaa', params='', query='table=tblccc&view=vewddd', fragment='')
    query_params = parse_qs(parsed_url.query)       #解析url参数:{'table': ['tblccc'], 'view': ['vewddd']}
    app_token = parsed_url.path.split('/')[-1]
    table_id = query_params.get('table', [None])[0]
    view_id = query_params.get('view', [None])[0]
    print(f'成功解析链接,app_token:{app_token},table_id:{table_id},view_id:{view_id}。关联方法:get_table_params。')
    return app_token, table_id, view_id

def get_tenant_access_token(app_id, app_secret):
    url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
    payload = json.dumps({
        "app_id": app_id,
        "app_secret": app_secret
    })
    headers = {'Content-Type': 'application/json'}
    response = requests.request("POST", url, headers=headers, data=payload)
    tenant_access_token = response.json()['tenant_access_token']
    print(f'成功获取tenant_access_token:{tenant_access_token}。关联函数:get_table_params。')
    return tenant_access_token

def get_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token='', page_size=20):
    url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/search?page_size={page_size}&page_token={page_token}&user_id_type=user_id"
    payload = json.dumps({"view_id": view_id})
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {tenant_access_token}'
    }
    response = requests.request("POST", url, headers=headers, data=payload)
    print(f'成功获取page_token为【{page_token}】的数据。关联函数:get_bitable_datas。')
    return response.json()


def get_all_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token='', page_size=20):
    has_more = True
    feishu_datas = []
    while has_more:
        response = get_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token, page_size)
        if response['code'] == 0:
            page_token = response['data'].get('page_token')
            has_more = response['data'].get('has_more')
            # print(response['data'].get('items'))
            # print('\n--------------------------------------------------------------------\n')
            feishu_datas.extend(response['data'].get('items'))
        else:
            raise Exception(response['msg'])
    print(f'成功获取飞书多维表所有数据,返回 feishu_datas。关联函数:get_all_bitable_datas。')
    return feishu_datas

def get_bitable_fields(tenant_access_token, app_token, table_id, page_size=500):
    url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/fields?page_size={page_size}"
    payload = ''
    headers = {'Authorization': f'Bearer {tenant_access_token}'}
    
    response = requests.request("GET", url, headers=headers, data=payload)
    field_infos = response.json().get('data').get('items')
    print('成功获取飞书字段信息,关联函数:get_bitable_fields。')
    return field_infos

def merge_list(ls_from, ls_join, on=None, left_on=None, right_on=None):
    """将两个[{},{}]结构的数据合并"""
    df_from = pd.DataFrame(ls_from)
    df_join = pd.DataFrame(ls_join)
    if on is not None:
        df_merge = df_from.merge(df_join, how='left', on=on)
    else:
        df_merge = df_from.merge(df_join, how='left', left_on=left_on, right_on=right_on) # , suffixes=('', '_y')
    print(f'成功合并列表或DataFrame。关联方法:merge_list。')
    return df_merge


def extract_key_fields(feishu_datas, store_fields_info_df):
    """处理飞书数据类型编号的数据"""
    print('开始处理飞书多维表关键字段数据...')
    # 需要record_id 和 订单号,用于和数据库数据匹配
    df_feishu = pd.DataFrame(feishu_datas)
    df_return = pd.DataFrame()
    
    #根据列的数据类型,分别处理对应的数据。注意:仅返回以下列举的数据类型,如果fields_map的内容包含按钮、流程等数据类型的飞书列,忽略。
    for index, row in store_fields_info_df.iterrows():
        if row['type'] == 1:       #文本
            df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],[{}])[0].get('text'))
        
        elif row['type'] in (2, 3, 4, 7, 13, 1005):  #数字、单选、多选、复选框、手机号、自动编号
            df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name']))
        
        elif row['type'] in (5, 1001, 1002):         #日期(包含创建和更新),需要加 8 小时,即 8*60*60*1000=28800 秒
            df_return[row['tb_field_name']] = pd.to_datetime(df_feishu['fields'].apply(lambda x:28800 + int(x.get(row['field_name'],1000)/1000)), unit='s')
        
        elif row['type'] in(11, 23, 1003, 1004):       #人员、群组、创建人、修改人,遍历取name
            df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x: ','.join([i.get('name') for i in x.get(row['field_name'],[{"name":""}])]))  # 需要遍历

        elif row['type'] == 15:    #链接
            df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],{}).get('link'))
        
        elif row['type'] == 17:    #附件,遍历取url
            df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:[i.get('url') for i in x.get(row['field_name'],[{}])]) #需要遍历
    
        elif row['type'] in(18, 21):    #单向关联、双向关联,取link_record_ids
            df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],{}).get('link_record_ids'))
    
        elif row['type'] in(19, 20):    #查找引用和公式
            df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:json.dumps(x.get(row['field_name'])))
    
        elif row['type'] == 22:    #地理位置
            df_return[row['tb_field_name']] = df_feishu['fields'].apply(lambda x:x.get(row['field_name'],{}).get('location'))
            
    #加上record_id
    df_return['record_id'] = df_feishu.record_id
    #加上表更新字段
    df_return['last_modified_time'] = datetime.datetime.now()
    print(f'成功提取入库字段的数据。关联方法:extract_key_fields。')
    return df_return

def generate_create_schema(store_fields_info_df):
    columns = []
    columns_index = {}
    for index, row in store_fields_info_df.iterrows():
        name = row['tb_field_name']; type=row['mc_type']; comment=row['feishu_field_name']
        # print(name,type,comment)
        columns.append(Column(name=name, type=type, comment=comment))
        columns_index[name] = (index, comment)
    columns.append(Column(name='record_id', type='string', comment='飞书表行record_id'))
    columns.append(Column(name='last_modified_time', type='datetime', comment='数据更新时间'))
    partitions = [Partition(name='pt', type='string', comment='分区字段')]
    schema = Schema(columns=columns, partitions=partitions)
    print(f'成功生成 MaxCompute 建表 schema。关联方法:generate_create_schema。')
    return schema, columns, columns_index, partitions

def cre_mc_table(db_table_name, schema):
    if o.exist_table(db_table_name):
        print(f'表单 {db_table_name} 已存在,不需要新建。关联方法:cre_mc_table。')
    else:
        table = o.create_table(db_table_name, schema, if_not_exists=True)
        print(f'成功创建 MaxCompute 表:{db_table_name}。关联方法:cre_mc_table。')

def generate_astype(store_fields_info_df):
    """
    功能:对整合了飞书列入库的列、飞书列类型、和阿里云 DataFrame 数据类型的 pandas DataFrame,提取字段名和阿里云DataFrame 数据类型
    """
    #生成阿里云DataFrame的as_type,用于将处理好的数据转为阿里云DataFrame,格式:{'field_no': 'string', 'field_select': 'float64', 'field_text': 'datatime'}
    mc_df_astype = store_fields_info_df.set_index('tb_field_name').to_dict()['mc_df_type']
    #加上record_id
    mc_df_astype['record_id'] = 'string'
    #加一个表更新时间
    mc_df_astype['last_modified_time'] = 'DATETIME'
    # print(mc_df_astype)
    print('成功生成阿里云 DataFrame 的 as_type。方法:generate_astype')
    return mc_df_astype

def insert_mc_table(feishu_df, mc_table_name, mc_df_astype):
    """
    生成 PyODPS DataFrame 并将数据插入 Maxcompute 表
    """
    aliyun_df = DataFrame(feishu_df,as_type=mc_df_astype)
    # # 参数值获取1:使用 datetime 模块获取
    # ptdate = (datetime.datetime.today()- datetime.timedelta(days=1)).strftime('%Y%m%d')  #昨天
    # 参数值获取2:使用 DataWorks 调度参数获取
    ptdate = args['ptdate']
    aliyun_df.persist(mc_table_name, partition=f"pt={ptdate}", drop_partition=True, create_partition=True)    
    print(f'成功将飞书数据写入 MaxCompute 数据表:{mc_table_name}。关联方法:insert_mc_table。')

def custom_field(df_return, columns, columns_index, mc_df_astype, partitions):
    # 2.1 场景一:把数字入库为 int 类型
    # 修改 SQL 即可
    # cre_ddl = cre_ddl.replace('field_number float','field_number int')
    column = Column(name='field_number', type='bigint', comment=columns_index['field_number'][1])
    columns[columns_index['field_number'][0]] = column
    #修改mc_df_astype
    mc_df_astype['field_number'] = 'int64'
    
    # 2.2 场景二:把日期入库为 date 类型
    # 修改 df,MySQL会自动截断,Maxcompute不行,需要使用 x.date() 处理
    df_return['field_createdtime'] = df_return['field_createdtime'].apply(lambda x:x.date())
    # 修改 SQL
    # cre_ddl = cre_ddl.replace('field_createdtime datetime','field_createdtime date')
    column = Column(name='field_createdtime', type='date', comment=columns_index['field_createdtime'][1])
    columns[columns_index['field_createdtime'][0]] = column
    # #修改mc_df_astype
    # mc_df_astype['field_createdtime'] = 'date'
    
    # 2.3 场景三:日期给定默认最大值
    # 修改 df 即可
    #默认值改为 2222-01-01 00:00:00
    mask = df_return['field_date'] == pd.Timestamp('1970-01-01 08:00:01')
    df_return.loc[mask, 'field_date'] = pd.Timestamp('2222-01-01 00:00:00')
    
    # 2.4 场景四:公式保留具体值
    # 修改 df
    # 修改 SQL
    df_return['field_numformula'] = df_return['field_numformula'].apply(lambda x:json.loads(x)['value'][0])
    # cre_ddl = cre_ddl.replace('field_numformula varchar(256)','field_numformula int')
    column = Column(name='field_numformula', type='bigint', comment=columns_index['field_numformula'][1])
    columns[columns_index['field_numformula'][0]] = column
    #修改mc_df_astype
    mc_df_astype['field_numformula'] = 'int64'

    # 创建新的 schema
    schema = Schema(columns=columns, partitions=partitions)
    print('定制函数打印数据和建表语句')
    print('----------------------------------------------\n', df_return[['field_number','field_createdtime','field_date','field_numformula']].head(5))
    print('----------------------------------------------\n', schema.columns)
    return df_return, schema, mc_df_astype

def main(mc_table_name, bitable_url, fields_map):
    # 基本配置
    app_token, table_id, view_id = get_table_params(bitable_url)
    app_id = 'your_app_id'
    app_secret = 'your_app_secret'
    tenant_access_token = get_tenant_access_token(app_id, app_secret)
    page_size = 50
    
    # 获取飞书多维表所有数据
    feishu_datas = get_all_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_size=page_size)

    #获取飞书字段信息
    feishu_fields = get_bitable_fields(tenant_access_token, app_token, table_id)

    # 以 fields_map 为准关联数据
    store_fields_info_df = merge_list(fields_map, feishu_fields, left_on='feishu_field_name', right_on='field_name')

    # 处理入库字段数据
    feishu_df = extract_key_fields(feishu_datas, store_fields_info_df)
    
    # 关联入库数据类型
    data_type_map = [{"feishu_type": 1   ,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 2   ,"mc_type": "double"        ,"mc_df_type": "float64"       }
                    ,{"feishu_type": 3   ,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 4   ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  }
                    ,{"feishu_type": 5   ,"mc_type": "datetime"      ,"mc_df_type": "datetime"      }
                    ,{"feishu_type": 7   ,"mc_type": "boolean"       ,"mc_df_type": "boolean"       }
                    ,{"feishu_type": 11  ,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 13  ,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 15  ,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 17  ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  }
                    ,{"feishu_type": 18  ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  }
                    ,{"feishu_type": 19  ,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 20  ,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 21  ,"mc_type": "array<string>" ,"mc_df_type": "list<string>"  }
                    ,{"feishu_type": 22  ,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 23  ,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 1001,"mc_type": "datetime"      ,"mc_df_type": "datetime"      }
                    ,{"feishu_type": 1002,"mc_type": "datetime"      ,"mc_df_type": "datetime"      }
                    ,{"feishu_type": 1003,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 1004,"mc_type": "string"        ,"mc_df_type": "string"        }
                    ,{"feishu_type": 1005,"mc_type": "string"        ,"mc_df_type": "string"        }]

    store_fields_info_df = merge_list(store_fields_info_df, data_type_map, left_on='type', right_on='feishu_type')

    # 生成 MaxCompute schema
    schema, columns, columns_index, partitions = generate_create_schema(store_fields_info_df)

    # 生成 DataFrame astype
    mc_df_astype = generate_astype(store_fields_info_df)
    
    # 定制化
    feishu_df, schema, mc_df_astype = custom_field(feishu_df, columns, columns_index, mc_df_astype, partitions)

    # 建 MaxCompute 数据表
    cre_mc_table(mc_table_name, schema)

    # MaxCompute 表插入数据
    insert_mc_table(feishu_df, mc_table_name, mc_df_astype)
    
    print(f'成功将飞书多维表({bitable_url})的数据入库到 mysql 数据表:{mc_table_name}。')

if __name__ == '__main__':
    mc_table_name = 'for_ods.feishu_data_type_test'
    bitable_url = "https://forchangesz.feishu.cn/base/SpY3b9LMFaodpOsE0kdcGEyonbg?table=tbl5BZE0Aubjz5Yy&view=vewDM4NGlP"
    fields_map = [{'tb_field_name': 'field_text','feishu_field_name': '文本'}
            ,{'tb_field_name': 'field_email','feishu_field_name': 'email'}
            ,{'tb_field_name': 'field_select','feishu_field_name': '单选'}
            ,{'tb_field_name': 'field_mobile','feishu_field_name': '电话号码'}
            ,{'tb_field_name': 'field_no','feishu_field_name': '自动编号'}
            ,{'tb_field_name': 'field_member1','feishu_field_name': '人员1'}
            ,{'tb_field_name': 'field_group1','feishu_field_name': '群组1'}
            ,{'tb_field_name': 'field_creator','feishu_field_name': '创建人'}
            ,{'tb_field_name': 'field_modifier','feishu_field_name': '修改人'}
            ,{'tb_field_name': 'field_member2','feishu_field_name': '人员2'}
            ,{'tb_field_name': 'field_group2','feishu_field_name': '群组2'}
            ,{'tb_field_name': 'field_url','feishu_field_name': '超链接'}
            ,{'tb_field_name': 'field_location','feishu_field_name': '地理位置'}
            ,{'tb_field_name': 'field_findnum','feishu_field_name': '查找引用数值'}
            ,{'tb_field_name': 'field_numformula','feishu_field_name': '数字公式'}
            ,{'tb_field_name': 'field_number','feishu_field_name': '数字'}
            ,{'tb_field_name': 'field_progress','feishu_field_name': '进度'}
            ,{'tb_field_name': 'field_money','feishu_field_name': '货币'}
            ,{'tb_field_name': 'field_Rating','feishu_field_name': '评分'}
            ,{'tb_field_name': 'field_bool','feishu_field_name': '复选框'}
            ,{'tb_field_name': 'field_date','feishu_field_name': '日期'}
            ,{'tb_field_name': 'field_createdtime','feishu_field_name': '创建时间'}
            ,{'tb_field_name': 'field_updatedtime','feishu_field_name': '更新时间'}
            ,{'tb_field_name': 'field_mulselect','feishu_field_name': '多选'}
            ,{'tb_field_name': 'field_singleunion','feishu_field_name': '单向关联'}
            ,{'tb_field_name': 'field_doubleunion','feishu_field_name': '双向关联'}
            ,{'tb_field_name': 'field_file','feishu_field_name': '附件'}
            ]
    main(mc_table_name, bitable_url, fields_map)

写入成功之后,查询数据看看结构长什么样。执行以下 SQL:

select * from for_ods.feishu_data_type_test;

最终执行的结果参考如下,最后多了一个分区列pt,记录一个昨天的日期:
image.png

image.png

image.png

四、小结

本文探讨了将飞书多维表数据入库 MaxCompute 分区表的方法,介绍了MaxCompute分区表的概念和建表方法,并探讨了如何写入分区表。
分区表可以按照定义的分区字段将数据分为不同的目录,通过分区可以优化查询效率,提高查询性能。
建表时,可以先创建 Schema,再根据 schema 建表;也可以直接使用类建表的 DDL 直接建表。
写表时,可以将 Pandas 的 DataFrame 转为列表直接写入;也可以先转为 PyODPS 的 DataFrame 再写入。
两种创建分区表的方法和两种写入分区表的方法,它们可以两两组合搭配使用。

至此,关于飞书多维表的读取和入库基本介绍完了。/手动撒花

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/673209.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

01背包变式例题

传送门——P2370 yyy2015c01 的 U 盘 题解&#xff1a;题目意思很好理解&#xff0c;就是说&#xff0c;当能够达到预期的U盘的最小接口&#xff08;接口越大&#xff0c;能传递的文件越大&#xff09;&#xff0c;然后我们就需要先看题目了&#xff0c;有n个文件&#xff0c;每…

Spring 中如何控制 Bean 的加载顺序?

如果你脱口而出说添加 Order 注解或者是实现 Ordered 接口&#xff0c;那么恭喜&#xff0c;你掉坑了。 一 Order 注解和 Ordered 接口 在 Spring 框架中&#xff0c;Order 是一个非常实用的元注解&#xff0c;它位于 spring-core 包下&#xff0c;主要用于控制某些特定上下文…

适合技术小白学习的项目1863java在线视频网站系统 Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java在线视频网站系统 是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助采用了java设计&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统采用web模式&#xff0c;系统主要采用B/S模式开发。 开发环境为TOMCAT7.0,Myeclipse8.5开发…

简单、免费、强大的高效率截图工具神器——Snipaste(下载安装+常用快捷键教学)

一、简介 Snipaste是一款功能强大的截图和贴图工具&#xff0c;它允许用户快速截取屏幕上的任意区域&#xff0c;并将截图以浮窗形式显示在屏幕上。用户可以自由调整浮窗的位置和大小&#xff0c;甚至将浮窗设置为半透明&#xff0c;以便在查看屏幕内容时不会遮挡视线。此外&a…

golang map部分原理源码个人走读-附个人理解过程图解

近期再写map的demo时出现了下面一段报错&#xff0c;于是带着疑惑去看了一下源码 目的&#xff1a;主要想知道为啥map不让并发读写 fatal error: concurrent map read and map write 一.map的数据结构 先有个印象&#xff0c;后续会详细介绍 // A header for a Go map. ty…

Educational Codeforces Round 166 (Rated for Div. 2) (A~C)

A. Verify Password 思路:按照ASCLL值进行比较就行(因为字母的ASCLL本来就在数字后面),所以,只要找到前面比后面的数大就输出NO,反之YES 代码实现: #include<bits/stdc.h> using namespace std; #define N 100005 typedef long long ll; ll n, m, num, sum, t; ll a[N]…

通过f-string编写简洁高效的Python格式化输出代码

Python 3.6中引入的f-string是Python中最常用的特征之一&#xff0c;它可以让我们编写更干净、更高效和更易于维护的代码&#xff0c;我们今天就由浅入深来详细介绍使用它的一些技巧。 对齐文本 在格式化输出时&#xff0c;对齐对可读性至关重要。无论是生成报告、记录数据还是…

kibana7.17.0查看index

kibana查看index Kibana 提供了一个直观的用户界面&#xff0c;可以查看和管理 Elasticsearch 的索引。以下是如何在 Kibana 中查看 Elasticsearch 索引的步骤&#xff1a; 1. 打开 Kibana 首先&#xff0c;确保 Kibana 已经启动并可以访问。通常&#xff0c;你可以通过浏览…

开箱即用的Spring Boot 企业级开发平台【毕设项目推荐】

项目概述 基于 Spring 实现的通用权限管理平台&#xff08;RBAC模式&#xff09;。整合最新技术高效快速开发&#xff0c;前后端分离模式&#xff0c;开箱即用。 核心模块包括&#xff1a;用户、角色、职位、组织机构、菜单、字典、日志、多应用管理、文件管理、定时任务等功能…

【自己动手】自制刷题系统(php+layui应用 社区工作者题库)

现在各种证都可以考&#xff0c;网上免费刷题的APP一大堆&#xff0c;我自己也想搞一个。网上的刷题软件有的用的很舒服&#xff0c;有的体检就很不好&#xff0c;热门的考试基本都有&#xff0c;不热门的基本就很差了&#xff0c;网上也有提供自制试卷的APP&#xff0c;但都有…

弹性云服务器ECS

ECS的功能&#xff1a; 1、类型丰富。支持多规格类型、多镜像类型、多磁盘种类。 2、灵活的计费模式。支持包年包月或按需计费模式购买云服务器&#xff0c;满足不同应用场景&#xff0c;根据业务波动随时购买或释放资源。 3、数据可靠。基于分布式架构的&#xff0c;可弹性…

2024年6月2日 (周日) 叶子游戏新闻

中医百科中药: 中医百科中药是一款非常强大的中药知识科普软件&#xff0c;该应用提供500多味中草药的文献资料&#xff0c;强大的搜索功能可根据功效、特点和关键词来快速查找中药&#xff0c;而且每味中药的图片、功效、主治、炮制方法等百科知识&#xff0c;可以很好的帮助你…

Opera 浏览器与Google联手,推出由Gemini驱动的全新AI功能

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

MedSegDiff-V2: Diffusion-Based Medical Image Segmentation with Transformer 论文总结

标题&#xff1a;MedSegDiff-V2: Diffusion-Based&#xff08;基于扩散模型&#xff09;Medical Image Segmentation&#xff08;医学图像分割&#xff09;with Transformer 论文&#xff08;AAAI&#xff09;&#xff1a;https://ojs.aaai.org/index.php/AAAI/article/view/28…

2024-6-2 石群电路-21

2024-6-2&#xff0c;星期日&#xff0c;天气&#xff1a;阴&#xff0c;心情&#xff1a;晴。今天没什么特别的事情发生&#xff0c;心情还是一如既往的好&#xff0c;明天就周一啦&#xff0c;虽然我暂时不用上班&#xff0c;但是希望大家新的一周元气满满~ 今日观看了石群老…

ANAH数据集- 大模型幻觉细粒度评估工具

大型语言模型&#xff08;LLMs&#xff09;在各种自然语言处理任务中取得了显著的性能提升。然而&#xff0c;它们在回答用户问题时仍面临一个令人担忧的问题&#xff0c;即幻觉&#xff0c;它们会产生听起来合理但不符合事实或无意义的信息&#xff0c;尤其是当问题需要大量知…

MongoDB-4.2.1 之安装和使用

安装 下载安装包 我自己电脑是 Windows7 的老古董&#xff0c;所以就下载老版本的 MongoDB。 mongodb: https://fastdl.mongodb.org/win32/mongodb-win32-x86_64-2012plus-4.2.1.zip 解压安装包到指定路径 我解压到的 C 盘 C:\mongodb-4.2.1 添加环境变量 创建数据库和…

gitlab服务器迁移(亲测有效)

描述&#xff1a;最近公司迁移gitlab&#xff0c;我没有迁移过&#xff0c;经过网上查找资料最终完成迁移&#xff0c;途中也遇到挺多坑和两个问题&#xff0c;希望能帮到你。 新服务器安装gitlab 注意&#xff1a;新服务器gitlab版本也需要和旧版本一致。 首先查看原Gitlab…

渗透测试靶机----FirstLeads_1.3

渗透测试靶机----FirstLeads_1.3 启动靶机&#xff0c;扫描ip&#xff0c;平平无奇 可以看出&#xff0c;这里是存在139这个主机的&#xff0c;这就是扫描出来的靶机ip继续探测端口及其他信息 发现这里只开启了80 端口 尝试一些基本目录&#xff0c;发现存在robot.txt文件目录…

WIN10环境下xposed环境搭建

禁止拿来干坏事&#xff0c;仅做学习为目的 环境需求 1.夜神模拟器7.1 2.Android stdio 2022.3.1 3. Adb环境配置 具体实现 1.安装xposed 打开可一键安装&#xff0c;重启 2.连接虚拟机 adb connect 127.0.0.1:620013.打开as,进入project 4.在lib下添加准备好的jar包 …