Flink Sql Redis Connector

经常做开发的小伙伴肯定知道用flink连接redis的时候比较麻烦,更麻烦的是解析redis数据,如果rdis可以普通数据库那样用flink sql连接并且数据可以像表格那样展示出来就会非常方便。

历时多天,我终于把flink sql redis connector写出来了,并且已经测试过可以用sql解析数据,下面直接展示写好的代码和执行结果,完整的代码可以在我的github上面看:https://github.com/niuhu3/flink_sql_redis_connector.git

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

1.使用案例和讲解

1.读取数据案例

CREATE TABLE orders (
  `order_id` STRING,
  `price` STRING,
  `order_time` STRING,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'mode' = 'single',
  'single.host' = '192.168.10.101',
  'single.port' = '6379',
  'password' = 'xxxxxx',
  'command' = 'hgetall',
  'key' = 'orders'
);


select * from orders




#集群模式
create table redis_sink (
site_id STRING,
inverter_id STRING,
start_time STRING,
PRIMARY KEY(site_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'cluster',
'cluster.nodes' = 'test3:7001,test3:7002,test3:7003,test3:8001,test3:8002,test3:8003',
'password' = '123123',
'command' = 'hgetall',
'key' = 'site_inverter'
)

cluster.nodes用来定义集群ip和host,例如:host1:p1,host2:p2,host3:p3

注:redis表必须定义主键,可以是单个主键,也可以是联合主键

以下为sql读取结果,直接将redis数据解析成我们需要的表格形式

2.写入数据案例

1. generate source data
CREATE TABLE order_source (
  `order_number` BIGINT,
  `price` DECIMAL(32,2),
  `order_time` TIMESTAMP(3),
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '5',
'fields.order_number.min' = '1',
'fields.order_number.max' = '20',
'fields.price.min' = '1001',
'fields.price.max' = '1100'
);

2. define redis sink table 

CREATE TABLE orders (
  `order_number` STRING,
  `price` STRING,
  `order_time` STRING,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'mode' = 'single',
  'single.host' = '192.168.10.101',
  'single.port' = '6379',
  'password' = 'xxxxxx',
  'command' = 'hmset',
  'key' = 'orders'
);

3. insert data to redis sink table (cast data type to string)

insert into redis_sink
	select
		cast(order_number as STRING) order_number,
		cast(price as STRING) price,
		cast(order_time as STRING) order_time
	from orders
	

redis表不会保存数据类型,所以在写入redis之前需要转成字符串类型,以下为写入redis数据的结果,redis的主键用 key + primary key + value 拼接而成,保证每条数据的唯一性,所以这也就要为什么redis table要定义主键

3.目前支持的功能 

1. 该connector目前支持多个写入和读取命令:

        读取:   get    hget     hgetall     hscan   lrange    smembers    zrange

        写入:   set   hset      hmset      lpush    rpush     sadd

2.针对最常用的hash类型数据支持模糊匹配,只输入表名可以查询整张表数据   

4. 连接参数说明

OptionRequiredDefaultTypeDescription
connectorrequirednoStringconnector name
moderequirednoStringredis cluster mode (single or cluster)
single.hostoptionalnoStringredis single mode machine host
single.portoptionalnointredis single mode running port
passwordoptionalnoStringredis database password
commandrequirednoStringredis write data or read data command
keyrequirednoStringredis key
expireoptionalnoIntset key ttl
fieldoptionalnoStringget a value with field when using hget command
cursoroptionalnoIntusing hscan command(e.g:1,2)
startoptional0Intread data when using lrange command
endoptional10Intread data when using lrange command
connection.max.wait-millsoptionalnoIntredis connection parameter
connection.timeout-msoptionalnoIntredis connection parameter
connection.max-totaloptionalnoIntredis connection parameter
connection.max-idleoptionalnoIntredis connection parameter
connection.test-on-borrowoptionalnoBooleanredis connection parameter
connection.test-on-returnoptionalnoBooleanredis connection parameter
connection.test-while-idleoptionalnoBooleanredis connection parameter
so.timeout-msoptionalnoIntredis connection parameter
max.attemptsoptionalnoIntredis connection parameter

2.动态读取和写入的工厂类

import org.apache.flink.common.RedisOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.sink.RedisDynamicTableSink;
import org.apache.flink.source.RedisDynamicTableSource;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;


public class RedisSourceSinkFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {


    
    private ReadableConfig options;
    public RedisSourceSinkFactory(){}

    public RedisSourceSinkFactory(ReadableConfig options){
        this.options = options;
    }

    
    //DynamicTableSourceFactory的实现方法,要用flink sql 读取数据需要实现这个接口
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        options = helper.getOptions();
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        List<Column> columns = schema.getColumns();
        ArrayList<String> columnNames = new ArrayList<>();
        columns.forEach(column -> columnNames.add(column.getName()));
        List<String> primaryKey = schema.getPrimaryKey().get().getColumns();
        return new RedisDynamicTableSource(options,columnNames,primaryKey);

    }
    
    /DynamicTableSinkFactory的实现方法,要用flink sql往redis中写数据这个也必须要实现
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        List<Column> columns = schema.getColumns();
        ArrayList<String> columnNames = new ArrayList<>();
        columns.forEach(column -> columnNames.add(column.getName()));
        List<String> primaryKey = schema.getPrimaryKey().get().getColumns();
        ReadableConfig options = helper.getOptions();
        return new RedisDynamicTableSink(options,columnNames,primaryKey);
    }



    @Override
    public String factoryIdentifier() {
        return "redis";
    }

    //sql connector 必填项
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        HashSet<ConfigOption<?>> options = new HashSet<>();
        options.add(RedisOptions.PASSWORD);
        options.add(RedisOptions.KEY);
        options.add(RedisOptions.MODE);
        return options;
    }
    
    //sql connector 选填项
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        HashSet<ConfigOption<?>> options = new HashSet<>();
        options.add(RedisOptions.SINGLE_HOST);
        options.add(RedisOptions.SINGLE_PORT);
        options.add(RedisOptions.CLUSTER_NODES);
        options.add(RedisOptions.FIELD);
        options.add(RedisOptions.CURSOR);
        options.add(RedisOptions.EXPIRE);
        options.add(RedisOptions.COMMAND);
        options.add(RedisOptions.START);
        options.add(RedisOptions.END);
        options.add(RedisOptions.CONNECTION_MAX_TOTAL);
        options.add(RedisOptions.CONNECTION_MAX_IDLE);
        options.add(RedisOptions.CONNECTION_TEST_WHILE_IDLE);
        options.add(RedisOptions.CONNECTION_TEST_ON_BORROW);
        options.add(RedisOptions.CONNECTION_TEST_ON_RETURN);
        options.add(RedisOptions.CONNECTION_TIMEOUT_MS);
        options.add(RedisOptions.TTL_SEC);
        options.add(RedisOptions.LOOKUP_ADDITIONAL_KEY);
        options.add(RedisOptions.LOOKUP_CACHE_MAX_ROWS);
        options.add(RedisOptions.LOOKUP_CACHE_TTL_SEC);

        return options;
    }

3. Redis Source 读取类

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.util.Preconditions;

import java.util.List;

public class RedisDynamicTableSource implements ScanTableSource {

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;


    public RedisDynamicTableSource(ReadableConfig options, List<String> columns, List<String> primaryKey) {
        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);

    }



    @Override
    public DynamicTableSource copy() {

        return new RedisDynamicTableSource(this.options, this.columns, this.primaryKey);
    }




    @Override
    public String asSummaryString() {
        return "redis table source";
    }


    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.all();
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

        RedisSourceFunction redisSourceFunction = new RedisSourceFunction(this.options, this.columns, this.primaryKey);
        return SourceFunctionProvider.of(redisSourceFunction,false);
    }
}

支持redis string, set ,zset ,hash数据的读取并解析成rowdata传入 flink

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanResult;

import java.util.*;


public class RedisSourceFunction extends RichSourceFunction<RowData>{

    private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class);

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;
    private Jedis jedis;
    private JedisCluster jedisCluster;
    private String value;
    private String field;
    private String[] fields;
    private String cursor;
    private Integer start;
    private Integer end;
    private String[] keySplit;
    private static int position = 1;
    private GenericRowData rowData;
    public RedisSourceFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){
        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);

    }


    @Override
    public void run(SourceContext<RowData> ctx) throws Exception {

        String password = options.get(RedisOptions.PASSWORD);
        Preconditions.checkNotNull(password,"password is null,please set value for password");
        Integer expire = options.get(RedisOptions.EXPIRE);
        String key = options.get(RedisOptions.KEY);
        Preconditions.checkNotNull(key,"key is null,please set value for key");
        String[] keyArr = key.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);
        String command = options.get(RedisOptions.COMMAND);

        // judge if command is redis set data command and stop method
        List<String> sourceCommand = Arrays.asList(RedisCommandOptions.SET, RedisCommandOptions.HSET, RedisCommandOptions.HMSET, RedisCommandOptions.LPUSH,
                RedisCommandOptions.RPUSH, RedisCommandOptions.SADD);
        if(sourceCommand.contains(command.toUpperCase())){ return;}

        Preconditions.checkNotNull(command,"command is null,please set value for command");
        String mode = options.get(RedisOptions.MODE);
        Preconditions.checkNotNull(command,"mode is null,please set value for mode");
        Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);
        Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);
        Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);

        Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);
        Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);
        Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);


        if(mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())){

            String host = options.get(RedisOptions.SINGLE_HOST);
            Integer port = options.get(RedisOptions.SINGLE_PORT);
            JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,
                    maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);
            jedis = jedisPool.getResource();
            jedis.auth(password);



            switch (command.toUpperCase()){
                        case RedisCommandOptions.GET:
                            value = jedis.get(key);
                            rowData = new GenericRowData(2);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            rowData.setField(1,BinaryStringData.fromString(value));
                            break;

                        case RedisCommandOptions.HGET:
                            field = options.get(RedisOptions.FIELD);
                            value = jedis.hget(key, field);
                            rowData = new GenericRowData(3);
                            keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));
                            }
                            rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));
                            break;

                        case RedisCommandOptions.HGETALL:
                            if (keyArr.length > 1){
                                for (String str : keyArr) {
                                    rowData = new GenericRowData(columns.size());
                                    keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                                    for (int i = 0; i < primaryKey.size(); i++) {
                                        rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                                    }

                                    for (int i = primaryKey.size(); i < columns.size(); i++) {
                                        String value = jedis.hget(str, columns.get(i));
                                        rowData.setField(i,BinaryStringData.fromString(value));
                                    }
                                    ctx.collect(rowData);
                                }

                            }else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){
                                rowData = new GenericRowData(columns.size());
                                keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                                for (int i = 0; i < primaryKey.size(); i++) {
                                    rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                                }

                                for (int i = primaryKey.size(); i < columns.size(); i++) {
                                    String value = jedis.hget(key, columns.get(i));
                                    rowData.setField(i,BinaryStringData.fromString(value));
                                }

                                ctx.collect(rowData);

                            }else{
                                //Fuzzy matching ,gets the data of the entire table
                                String fuzzyKey = new StringBuffer(key).append("*").toString();
                                Set<String> keys = jedis.keys(fuzzyKey);
                                for (String keyStr : keys) {
                                    keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                                    rowData = new GenericRowData(columns.size());
                                    for (int i = 0; i < primaryKey.size(); i++) {
                                        rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                                    }

                                    for (int i = primaryKey.size(); i < columns.size(); i++) {
                                        String value = jedis.hget(keyStr, columns.get(i));
                                        rowData.setField(i,BinaryStringData.fromString(value));
                                    }

                                    ctx.collect(rowData);

                                }
                            }

                            break;

                        case RedisCommandOptions.HSCAN:
                            cursor = options.get(RedisOptions.CURSOR);
                            ScanResult<Map.Entry<String, String>> entries = jedis.hscan(key, cursor);
                            List<Map.Entry<String, String>> result = entries.getResult();
                            keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            rowData = new GenericRowData(columns.size());
                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                            }

                            position = primaryKey.size();
                            for (int i = 0; i < result.size(); i++) {
                                value = result.get(i).getValue();
                                rowData.setField(position,BinaryStringData.fromString(value));
                                position++;
                            }
                            break;

                        case RedisCommandOptions.LRANGE:
                            start = options.get(RedisOptions.START);
                            end = options.get(RedisOptions.END);
                            List<String> list = jedis.lrange(key, start, end);
                            rowData = new GenericRowData(list.size() +1);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            list.forEach(s -> {
                                rowData.setField(position,BinaryStringData.fromString(s));
                                position++;});

                            break;

                        case RedisCommandOptions.SMEMBERS:
                            Set<String> smembers = jedis.smembers(key);
                            rowData = new GenericRowData(smembers.size() +1);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            smembers.forEach(s -> {
                                rowData.setField(position,BinaryStringData.fromString(s));
                                position++;});
                            break;

                        case RedisCommandOptions.ZRANGE:
                            start = options.get(RedisOptions.START);
                            end = options.get(RedisOptions.END);
                            Set<String> sets = jedis.zrange(key, start, end);
                            rowData = new GenericRowData(sets.size() +1);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            sets.forEach(s -> {
                                rowData.setField(position,BinaryStringData.fromString(s));
                                position++;});
                            break;


                        default:
                            LOG.error("Cannot process such data type: {}", command);
                            break;
                    }

                    if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){
                        ctx.collect(rowData);
                    }



            }else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){
            String nodes = options.get(RedisOptions.CLUSTER_NODES);
            String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);
            String[] host = new String[hostAndPorts.length];
            int[] port = new int[hostAndPorts.length];

            for (int i = 0; i < hostAndPorts.length; i++) {
                String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                host[i] = splits[0];
                port[i] = Integer.parseInt(splits[1]);
            }
            Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);
            Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);
            Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);

            jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,
                    maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);

            switch (command.toUpperCase()){
                case RedisCommandOptions.GET:
                    value = jedisCluster.get(key);
                    rowData = new GenericRowData(2);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    rowData.setField(1,BinaryStringData.fromString(value));
                    break;

                case RedisCommandOptions.HGET:
                    field = options.get(RedisOptions.FIELD);
                    value = jedisCluster.hget(key, field);
                    rowData = new GenericRowData(3);
                    keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));
                    }
                    rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));
                    break;

                case RedisCommandOptions.HGETALL:
                    if (keyArr.length > 1){
                        for (String str : keyArr) {
                            rowData = new GenericRowData(columns.size());
                            keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                            }

                            for (int i = primaryKey.size(); i < columns.size(); i++) {
                                String value = jedisCluster.hget(str, columns.get(i));
                                rowData.setField(i,BinaryStringData.fromString(value));
                            }
                            ctx.collect(rowData);
                        }

                    }else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){
                        rowData = new GenericRowData(columns.size());
                        keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                        for (int i = 0; i < primaryKey.size(); i++) {
                            rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                        }

                        for (int i = primaryKey.size(); i < columns.size(); i++) {
                            String value = jedisCluster.hget(key, columns.get(i));
                            rowData.setField(i,BinaryStringData.fromString(value));
                        }

                        ctx.collect(rowData);

                    }else{
                        //Fuzzy matching ,gets the data of the entire table
                        String fuzzyKey = new StringBuffer(key).append("*").toString();
                        Set<String> keys = jedisCluster.keys(fuzzyKey);
                        for (String keyStr : keys) {
                            keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            rowData = new GenericRowData(columns.size());
                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                            }

                            for (int i = primaryKey.size(); i < columns.size(); i++) {
                                String value = jedisCluster.hget(keyStr, columns.get(i));
                                rowData.setField(i,BinaryStringData.fromString(value));
                            }

                            ctx.collect(rowData);

                        }
                    }

                    break;

                case RedisCommandOptions.HSCAN:
                    cursor = options.get(RedisOptions.CURSOR);
                    ScanResult<Map.Entry<String, String>> entries = jedisCluster.hscan(key, cursor);
                    List<Map.Entry<String, String>> result = entries.getResult();
                    keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    rowData = new GenericRowData(columns.size());
                    for (int i = 0; i < primaryKey.size(); i++) {
                        rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                    }

                    position = primaryKey.size();
                    for (int i = 0; i < result.size(); i++) {
                        value = result.get(i).getValue();
                        rowData.setField(position,BinaryStringData.fromString(value));
                        position++;
                    }
                    break;



                case RedisCommandOptions.LRANGE:
                    start = options.get(RedisOptions.START);
                    end = options.get(RedisOptions.END);
                    List<String> list = jedisCluster.lrange(key, start, end);
                    rowData = new GenericRowData(list.size() +1);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    list.forEach(s -> {
                        rowData.setField(position,BinaryStringData.fromString(s));
                        position++;});

                    break;

                case RedisCommandOptions.SMEMBERS:
                    Set<String> smembers = jedisCluster.smembers(key);
                    rowData = new GenericRowData(smembers.size() +1);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    smembers.forEach(s -> {
                        rowData.setField(position,BinaryStringData.fromString(s));
                        position++;});
                    break;

                case RedisCommandOptions.ZRANGE:
                    start = options.get(RedisOptions.START);
                    end = options.get(RedisOptions.END);
                    Set<String> sets = jedisCluster.zrange(key, start, end);
                    rowData = new GenericRowData(sets.size() +1);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    sets.forEach(s -> {
                        rowData.setField(position,BinaryStringData.fromString(s));
                        position++;});
                    break;


                default:
                    LOG.error("Cannot process such data type: {}", command);
                    break;
            }

            if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){
                ctx.collect(rowData);
            }

        }else{
            LOG.error("Unsupport such {} mode",mode);
        }




    }

    @Override
    public void cancel() {

        if(jedis != null){
            jedis.close();
        }

        if(jedisCluster != null){
            jedisCluster.close();
        }

    }
}

4. Redis sink 写入类

public class RedisDynamicTableSink implements DynamicTableSink {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(RedisDynamicTableSink.class);

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;

    public RedisDynamicTableSink(ReadableConfig options, List<String> columns, List<String> primaryKey) {
        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .addContainedKind(RowKind.DELETE)
                .addContainedKind(RowKind.UPDATE_BEFORE)
                .addContainedKind(RowKind.UPDATE_AFTER)
                .build();

    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        RedisSinkFunction myRedisSinkFunction = new RedisSinkFunction(this.options,this.columns,this.primaryKey);
        return SinkFunctionProvider.of(myRedisSinkFunction);

    }

    @Override
    public DynamicTableSink copy() {
        return new RedisDynamicTableSink(this.options,this.columns,this.primaryKey);
    }

    @Override
    public String asSummaryString() {
        return "redis table sink";
    }
}
package org.apache.flink.sink;

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;

import java.util.List;


public class RedisSinkFunction extends RichSinkFunction<RowData>{

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class);

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;
    private String fields;
    private Jedis jedis;
    private JedisCluster jedisCluster;
    private String[] fieldsArr;
    private StringBuffer redisTableKey;
    private String value;

    public RedisSinkFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){

        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);
    }


    @Override
    public void invoke(RowData rowData, Context context) throws Exception {

        String password = options.get(RedisOptions.PASSWORD);
        Preconditions.checkNotNull(password,"password is null,please set value for password");
        Integer expire = options.get(RedisOptions.EXPIRE);
        String key = options.get(RedisOptions.KEY);
        Preconditions.checkNotNull(key,"key is null,please set value for key");
        String command = options.get(RedisOptions.COMMAND);
        Preconditions.checkNotNull(command,"command is null,please set value for command");
        String mode = options.get(RedisOptions.MODE);
        Preconditions.checkNotNull(command,"mode is null,please set value for mode");

        Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);
        Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);
        Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);

        Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);
        Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);
        Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);


        if (mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())) {

            String host = options.get(RedisOptions.SINGLE_HOST);
            Integer port = options.get(RedisOptions.SINGLE_PORT);
            JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,
                    maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);
            jedis = jedisPool.getResource();
            jedis.auth(password);

            switch (command.toUpperCase()){
                case RedisCommandOptions.SET:
                    value = rowData.getString(0).toString();
                    jedis.set(String.valueOf(key),String.valueOf(value));
                    break;

                case RedisCommandOptions.HSET:

                    String field = columns.get(1);
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    }

                    value = rowData.getString(1).toString();
                    jedis.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));

                case RedisCommandOptions.HMSET:
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        if (i != primaryKey.size() -1){
                            redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                        }

                    }
                    for (int i = 1; i < columns.size(); i++) {
                        if (!primaryKey.contains(columns.get(i))){
                            value = rowData.getString(i).toString();
                            jedis.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));
                        }
                    }

                    break;

                case RedisCommandOptions.LPUSH:
                    value = rowData.getString(0).toString();
                    jedis.lpush(key,value);

                    break;

                case RedisCommandOptions.RPUSH:
                    value = rowData.getString(0).toString();
                    jedis.rpush(key,value);

                    break;

                case RedisCommandOptions.SADD:
                    value = rowData.getString(0).toString();
                    jedis.sadd(key,value);
                    break;

                default:
                    LOG.error("Cannot process such data type: {}", command);
                    break;
            }


        }else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){
            String nodes = options.get(RedisOptions.CLUSTER_NODES);
            String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);
            String[] host = new String[hostAndPorts.length];
            int[] port = new int[hostAndPorts.length];

            for (int i = 0; i < hostAndPorts.length; i++) {
                String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                host[i] = splits[0];
                port[i] = Integer.parseInt(splits[1]);
            }
            Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);
            Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);
            Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);

            jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,
                    maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);

            switch (command.toUpperCase()){
                case RedisCommandOptions.SET:
                    value = rowData.getString(0).toString();
                    jedisCluster.set(String.valueOf(key),String.valueOf(value));
                    break;

                case RedisCommandOptions.HSET:

                    String field = columns.get(1);
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    }

                    value = rowData.getString(1).toString();
                    jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));

                case RedisCommandOptions.HMSET:
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    }

                    for (int i = 1; i < columns.size(); i++) {
                        value = rowData.getString(i).toString();
                        jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));
                    }

                    break;

                case RedisCommandOptions.LPUSH:
                    value = rowData.getString(0).toString();
                    jedisCluster.lpush(key,value);

                    break;


                case RedisCommandOptions.RPUSH:
                    value = rowData.getString(0).toString();
                    jedisCluster.rpush(key,value);

                    break;

                case RedisCommandOptions.SADD:
                    value = rowData.getString(0).toString();
                    jedisCluster.sadd(key,value);
                    break;


                default:
                    LOG.error("Cannot process such data type: {}", command);
                    break;
            }




        }else{
            LOG.error("Unsupport such {} mode",mode);
        }

    }

    @Override
    public void close() throws Exception {
        if(jedis != null){
            jedis.close();
        }

        if(jedisCluster != null){
            jedisCluster.close();
        }

    }
}

对以上代码不理解为啥这样写的,可以参考我的上一篇帖子:

Flink Sql-用户自定义 Sources & Sinks_source表和sink表-CSDN博客

 最后再次希望大家可以去github或者社区支持一下,让这个连接器可以正式开源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/736601.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

关于Pytorch转换为MindSpore的一点建议

一、事先准备 必须要对Mindspore有一些了解&#xff0c;因为这个框架确实有些和其它流程不一样的地方&#xff0c;比如算子计算、训练过程中的自动微分&#xff0c;所以这两个课程要好好过一遍&#xff0c;官网介绍文档最好也要过一遍 1、零基础Mindspore&#xff1a;https://…

【MySQL统计函数count详解】

MySQL统计函数count详解 1. count()概述2. count(1)和count(*)和count(列名)的区别3. count(*)的实现方式 1. count()概述 count() 是一个聚合函数&#xff0c;返回指定匹配条件的行数。开发中常用来统计表中数据&#xff0c;全部数据&#xff0c;不为null数据&#xff0c;或…

手持弹幕LED滚动字幕屏夜店表白手灯接机微信抖音小程序开源版开发

手持弹幕LED滚动字幕屏夜店表白手灯接机微信抖音小程序开源版开发 专业版 插件版 手持弹幕小程序通常提供多种功能&#xff0c;以便用户在不同的场合如夜店、表白、接机等使用。以下是一些常见的功能列表&#xff1a; 文本输入&#xff1a; 输入要显示的文字内容&#xff0c;…

独角兽品牌獭崎酱酒:高性价比的酱香之选

在酱香型白酒领域中&#xff0c;獭崎酱酒以其独特的品牌定位和高性价比迅速崛起&#xff0c;成为市场上备受关注的独角兽品牌。作为贵州茅台镇的一款新秀酱香酒&#xff0c;獭崎酱酒不仅传承了百年酿造工艺&#xff0c;还以创新的商业模式和亲民的价格赢得了广大消费者的青睐。…

【C++算法】——高精度(加,减,乘,除)

前言 高精度算法就是为了去解决一些比较大的数&#xff0c;这些数大到long long都存不下。&#xff0c;这里的主要思想就是用字符串来存。 下面的内容有很多用到c的容器&#xff0c;不明白的可以先去学习stl。 一 高精度加法 首先第一步就是去模拟我们自己写的加法&#xff…

活用变量,让Postman的使用飞起来

在 Postman 中使用变量是一种非常强大的功能&#xff0c;它可以极大地增强 API 测试和开发的灵活性和效率。 Postman变量的类型 变量在 Postman 中可以在多个层次设置和使用&#xff0c;包括 全局变量环境变量集合变量局部变量&#xff08;如在脚本中暂时创建的变量&#xf…

表驱动法 -优化逻辑分支

表驱动法 -优化逻辑分支 定义 表驱动法&#xff08;Table-Driven Approach&#xff09;是一种编程模式&#xff0c;可以将输入变量作为直接或间接索引在表里查找所需的结果或处理函数&#xff0c;而不使用逻辑语句&#xff08;if-else 和 switch-case&#xff09;。索引表可以…

安卓中使用ttf字体文件

官方文档中提供的方法要设备能访问google&#xff1f; 官方方法 直接下载字体的fft文件 我要使用的是lexend 需要的格式可以在里面搜索 使用下载的ttf文件 解压出来 可以单独使用static里面的&#xff0c;里面是直接的lexend的各种格式 但是我这里直接使用Lexend-Vari…

连接Huggingface报requests.exceptions.SSLError错误

最近在学习使用 SHAP 算法解释 BERT 模型的输出结果&#xff0c;然而在从 Huggingface 上导入模型和数据集的过程中出现了网络连接相关的错误&#xff0c;本文用于记录错误类型和解决错误的方法。 1 代码示例 SHAP 官方展示的代码如下&#xff1a; import datasets import nu…

Linux应急响应——知攻善防应急靶场-Linux(1)

文章目录 查看history历史指令查看开机自启动项异常连接和端口异常进程定时任务异常服务日志分析账户排查总结 靶场出处是知攻善防 Linux应急响应靶机 1 前景需要&#xff1a; 小王急匆匆地找到小张&#xff0c;小王说"李哥&#xff0c;我dev服务器被黑了",快救救我&…

视频格式怎么转换?9 个免费视频转换工具

前 9 款免费视频转换器有哪些&#xff1f;在此视频转换器评论中&#xff0c;我们收集了一些有用的提示并列出了顶级免费视频转换器软件&#xff0c;还找出了适合所有级别&#xff08;从初学者到专家&#xff09;的最佳免费视频转换器。 1. Geekersoft免费在线视频转换 最好的免…

【报错】JDBC SQL语句表名报错 解决办法

解决办法 修改检测等级 不是检测有问题吗&#xff0c;那就将idea的检测问题取消掉或者修改检测问题等级&#xff0c;根本问题上我们写的sql语句是一个字符串传过去&#xff0c;只要在mysql查询语句能够正确执行&#xff0c;不要这种检测也罢。

实际项目开发:Spring集成Redis,并实现短信登录功能

redis新手&#xff0c;学了几种基本数据类型&#xff0c;却不知道怎么使用&#xff1f; 总是一边学一边忘&#xff1f; 学会了Redis的大多数使用命令&#xff0c;却不知道如何在项目中使用&#xff1f; 本文将从实际出发&#xff0c;为大家解决这些问题。 我是蚊子码农&#xf…

TikTok账号运营:静态住宅IP为什么可以防封?

静态住宅IP代理服务是一种提供稳定、静态IP地址并可隐藏用户真实IP地址的网络代理服务。此类代理服务通常使用高速光纤网络来提供稳定、高速的互联网体验。与动态IP代理相比&#xff0c;静态住宅IP代理的IP地址更稳定&#xff0c;被封的可能性更小&#xff0c;因此更受用户欢迎…

JAVA学习过程中遇到的问题

前言 记录学习过程中遇见的各种问题。希望对你有帮助。 目录 前言 1、新建maven项目时&#xff0c;archetype项目骨架加载慢 2、maven的pop.xml添加依赖项无法检测到 3、java: 无效的目标发行版: 20 4、idea添加maven依赖太慢 5、CTRLCV复制粘贴太慢 6、Swagger写接口文…

20240621日志:大模型压缩-从闭源大模型蒸馏

目录 1. 核心内容2. 方法2.1 先验估计2.2 后验估计2.3 目标函数 3. 交叉熵损失函数与Kullback-Leibler&#xff08;KL&#xff09;损失函数 location&#xff1a;beijing 涉及知识&#xff1a;大模型压缩、知识蒸馏 Fig. 1 大模型压缩-知识蒸馏 1. 核心内容 本文提出在一个贝…

(Amazing!) 通过 vfox 在 Windows 上安装管理多个 Erlang/OTP 和 Elixir 的版本

大概一个多月前, 我写了篇关于如何使用跨平台版本管理工具 vfox 在 Linux 系统下安装管理多个 Erlang/OTP 版本的文章 -> 通过 vfox 安装管理多版本 Erlang 和 Elixir. 文章使用的示范操作系统是 Ubuntu 20.04 Linux 操作系统. 最近 vfox-erlang 和 vfox-elixir 插件的最新…

如何关闭软件开机自启,提升电脑开机速度?

如何关闭软件开机自启&#xff0c;提升电脑开机速度&#xff1f;大家知道&#xff0c;很多软件在安装时默认都会设置为开机自动启动。但是&#xff0c;有很多软件在我们开机之后并不是马上需要用到的&#xff0c;开机启动的软件过多会导致电脑开机变慢。那么&#xff0c;如何关…

xshell使用vi命令:bash:vim:command not found

你们好&#xff0c;我是金金金。 场景 此时我通过xshell客户端连接到了远程的虚拟机。想用vi命令编辑一个文件时&#xff0c;显示&#xff1a;bash: vim: command not found 排查 看报错提示就可以知道&#xff0c;没找到vim命令 解决 使用包管理器 apt 来安装 vim 更新你的软…

springboot+vue+mybatis旅游管理+PPT+论文+讲解+售后

随着人民生活水平的提高,旅游业已经越来越大众化,而旅游业的核心是信息,不论是对旅游管理部门、对旅游企业,或是对旅游者而言,有效的获取旅游信息,都显得特别重要.旅游管理系统将使旅游相关信息管理工作规范化、信息化、程序化,提供旅游景点、旅游线路,旅游新闻等服务本文以jsp…