import random
import time
import requests
import base64
import os
import datetime
import numpy as np
import cv2
from PIL import Image
import argparse
class IMGNetProcess ( object ) :
"""
百度 图片处理
"""
def __init__ ( self, file , kind) :
self. file = file
self. kind = kind
self. headers = {
"User-Agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"
}
self. feature_map = {
"1" : self. move_water_mark,
"3" : self. clear_img,
"4" : self. extend_img,
"6" : self. re_drawl,
"7" : self. img_similar,
"9" : self. img_matting,
"14" : self. new_style,
"15" : self. draw_line,
}
def get_now_time ( self) :
"""
:return:
"""
return datetime. datetime. now( ) . strftime( '%Y-%m-%d_%H-%M-%S' )
def get_out_file ( self, now_time, desc, task_id, index= "" ) :
"""
:return:
"""
img_save_path = r"imgs"
os. makedirs( img_save_path, exist_ok= True )
img_name = f" { now_time} _ { desc} _ { self. kind} _ { task_id} _ { index} .png"
img_file = os. path. join( img_save_path, img_name)
return img_file
def add_lines ( self, points, desc, task_id) :
"""
:param points:
:param desc:
:param task_id:
:return:
"""
img = np. asarray( Image. open ( self. file ) )
lis = [ ]
for point in points:
lis. append( [ point[ "x" ] , point[ "y" ] ] )
np_points = np. array( lis, np. int32)
cv2. polylines( img, [ np_points] , isClosed= False , color= ( 255 , 0 , 0 ) , thickness= 2 )
mask = np. zeros( img. shape[ : 2 ] , dtype= np. uint8)
cv2. fillPoly( mask, [ np_points] , ( 255 , 255 , 255 , 255 ) )
masked_image = cv2. bitwise_and( img, img, mask= mask)
x, y, w, h = cv2. boundingRect( np_points)
cropped_image = masked_image[ y: y + h, x: x + w]
now_time = self. get_now_time( )
img_file = self. get_out_file( now_time, desc, task_id)
cv2. imencode(
img_file, cv2. cvtColor(
cropped_image, cv2. COLOR_RGB2BGR) ) [ 1 ] . tofile( img_file)
print ( f" { img_file} 结果保存成功" )
def img_similar ( self) :
"""
:return:
"""
create_level = input ( "请输入相似等级 1-6:" )
try :
if not ( int ( create_level) <= 6 and int ( create_level) >= 1 ) :
raise ValueError
except :
create_level = "5"
return { "create_level" : int ( create_level) } , "相似图"
def img_matting ( self) :
"""
:return:
"""
return { } , "抠图"
def new_style ( self) :
"""
:return:
"""
style = input ( "请输入风格类型:\n1-->宫崎骏风\n2-->橡皮泥风\n3-->油画风\n 默认为1:" )
style_map = {
"1" : "miyazaki" ,
"2" : "clay" ,
"3" : "monet" ,
}
return { "style" : style_map. get( style, "1" ) } , "风格"
def re_drawl ( self) :
"""
:return:
"""
create_level = input ( "请输入重绘等级 1-6:" )
try :
if not ( int ( create_level) <= 6 and int ( create_level) >= 1 ) :
raise ValueError
except :
create_level = "5"
return { "create_level" : int ( create_level) } , "重绘"
def extend_img ( self) :
"""
:return:
"""
ext_ratio = input ( "请输入扩图比例:\n1-->1:1\n2-->4:3\n3-->3:4\n 默认为1:" )
ext_map = {
"1" : "1:1" ,
"2" : "4:3" ,
"3" : "3:4" ,
}
return { "ext_ratio" : ext_map. get( ext_ratio, "1" ) } , "扩图"
def move_water_mark ( self) :
"""
:return:
"""
return { } , "去水印"
def draw_line ( self) :
"""
:return:
"""
return { } , "线稿"
def clear_img ( self) :
"""
:return:
"""
return { } , "清晰"
def base_img ( self) :
"""
:return:
"""
with open ( self. file , "rb" ) as fp:
result = base64. b64encode( fp. read( ) ) . decode( 'utf-8' )
return result
def add_task ( self, img_base_data) :
"""
:param img_base_data:
:return:
"""
url = "https://image.baidu.com/aigc/pccreate"
data = {
"picInfo" : f"data:image/png;base64, { img_base_data} " ,
"type" : f" { self. kind} "
}
res, desc = self. feature_map. get( f" { self. kind} " ) ( )
data. update( res)
response = requests. post( url, headers= self. headers, data= data)
pcEditTaskid = response. json( ) [ "pcEditTaskid" ]
print ( f"任务提交成功: { pcEditTaskid} " )
return pcEditTaskid, desc
def get_result ( self, task_id, desc) :
"""
:param task_id:
:param desc:
:return:
"""
file_list = [ ]
url = "https://image.baidu.com/aigc/pcquery"
params = {
"taskId" : f" { task_id} " ,
}
while True :
response = requests. get( url, headers= self. headers, params= params)
res = response. json( )
isGenerate = res[ "isGenerate" ]
print ( f"正在处理中: { res. get( 'progress' ) } %" )
now_time = self. get_now_time( )
if isGenerate:
if self. kind == "9" :
self. add_lines( res[ "foreground" ] [ 0 ] [ "outlines" ] [ 0 ] [ "line" ] , desc, task_id)
return file_list
elif res. get( 'status' ) == 5 :
print ( res)
return file_list
for index, i in enumerate ( res[ "picArr" ] , 1 ) :
img_file = self. get_out_file( now_time, desc, task_id, index)
with open ( img_file, "wb" ) as fp:
fp. write( base64. b64decode( i[ "src" ] [ 23 : ] ) )
file_list. append( img_file)
print ( f" { img_file} 结果保存成功" )
return file_list
time. sleep( random. randint( 1 , 3 ) )
def process ( self) :
"""
:return:
"""
img_data = self. base_img( )
task_id, desc = self. add_task( img_data)
self. get_result( task_id, desc)
if __name__ == '__main__' :
tip_msg = "mode可选:\n1-->去水印\t3-->清晰\t\t4-->扩图\n6-->重绘 \t7-->相似\t\t9-->抠图\n14-->风格\t15-->线稿\n"
print ( "\t\t\t\tAI图片处理工具\n" )
parser = argparse. ArgumentParser( description= tip_msg)
parser. add_argument( '-i' , dest= 'input_path' , help = 'path to the img file' )
parser. add_argument( '-m' , dest= 'mode' , help = 'mode for process' )
args = parser. parse_args( )
if args. input_path and args. mode:
imgnp = IMGNetProcess( os. path. abspath( args. input_path) , args. mode)
imgnp. process( )
else :
print ( "Please provide both -i and -m arguments" )