1. influxDB连接
首先用InfluxDBStudio软件连接influxDB数据库来查看所有表:
2. 写sql语句来查询数据
然后和平时写sql查询语句一样,先创建连接client,然后调用其query函数来查询获取数据
self.client = influxdb.InfluxDBClient(host=influx_host, port=influx_port, username=influx_username, password=influx_password, database=influx_database, timeout=influx_timeout)
query_sql = f"""select * from {table_name} \
where point = '{temperature_point_id}'\
and nowTime >= {begin_time} and nowTime < {end_time}"""
tables = self.client.query(query_sql)
3. 向influxDB数据库插入数据
插入数据时无需提前新建表,只需要指定表名直接插入即可:
self.influx_conn()
if data:
for i in range(len(data)):
data[i].update({"measurement": table_name}) # 加上表名
data[i].update({"time": now_time}) # 加上时间
self.client.write_points(data)
else:
print("data is None!")
4. 完整代码(code)
import influxdb
import pandas as pd
import datetime
import time
influx_host = "132.12.29.18"
influx_port = 8086
influx_username = "admin"
influx_password = "abc32100"
influx_database = "dqcloud"
influx_timeout = 10
def get_time_str(time_stamp): # 时间戳转字符串
time_array = time.localtime(int(time_stamp) / 1000)
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time_array)
return time_str
class InfluxInfo(object):
def __init__(self):
pass
def influx_conn(self):
self.client = influxdb.InfluxDBClient(host=influx_host, port=influx_port, username=influx_username,
password=influx_password, database=influx_database, timeout=influx_timeout)
def query_data(self, table_name, temperature_point_id, begin_time, end_time):
self.influx_conn()
if type == "query":
# point:测温点id,num:测温点编号,lastTime:上次测温时间,nowTime:巡检时间,v:温度值,tr:温升值
query_sql = f"""select * from {table_name} \
where point = '{temperature_point_id}'\
and nowTime >= {begin_time} and nowTime < {end_time}"""
tables = self.client.query(query_sql)
if len(tables) > 0:
return tables._get_series()
else:
return []
def insert_data(self, table_name, now_time, data=None):
self.influx_conn()
if data:
for i in range(len(data)):
data[i].update({"measurement": table_name}) # 加上表名
data[i].update({"time": now_time}) # 加上时间
self.client.write_points(data)
else:
print("data is None!")
def get_data_by_tum(query_rslt):
"""
"time": 数据插入时间(时序库自带的,不是自己插入的)
"log": "巡检记录id"
"point": "测温点id"
"num": "测温点编号"
"lastTime": "上次测温时间"
"nowTime": "巡检时间"
"v": "温度值"
"tr": "温升值"
"overTempAlert": "超温告警:0-否,1-是"
"overTempWarning": "超温预警:0-否,1-是"
"tempRiseAlert": "温升告警:0-否,1-是"
"""
timelist = [] #
nowTime_list = []
lastTime_list = []
tr_list = []
valuelist = []
if len(query_rslt) > 0:
result = query_rslt[0]['values']
for r in result:
if len(r) > 4 and r[-1]:
timestr = r[0].split(".")[0]
timestr = timestr.replace("T", " ")
timelist.append(timestr)
lastTime_list.append(get_time_str(r[1]))
nowTime_list.append(get_time_str(r[3]))
tr_list.append(float(r[-2]))
valuelist.append(float(r[-1]))
if len(timelist) > 0:
returndata = pd.DataFrame({'time': timelist, 'lastTime': lastTime_list, 'nowTime': nowTime_list, 'tr': tr_list, 'value': valuelist})
else:
returndata = pd.DataFrame()
return returndata
if __name__ == '__main__':
info = InfluxInfo()
# "point": "测温点id"
# "nowTime": "预测时间"
# "v": "温度值"
# "tr": "温升值"
# "overTempWarning": "超温预警:0-否,1-是"
# "tempRiseWarning": "温升预警:0-否,1-是"
data = [
{
"tags": { # 标签,元数据信息和标识数据的键值对
"point": "85231",
},
"fields": { # 字段,实际的数值数据
"predictTime": 1698289692244,
"v": 44,
"tr": 0.64,
"overTempWarning": 0,
"tempRiseWarning": 0
}
},
{
"tags": {
"point": "85232",
},
"fields": {
"predictTime": 1698289692292,
"v": 45,
"tr": 1.69,
"overTempWarning": 0,
"tempRiseWarning": 1
}
}
]
now_time = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%dT%H:%M:%SZ")
info.insert_data("point_predict_data", now_time, data)