在Fink官网中sink端只是给出了常规的write api.在我们实际开发场景中需要将flink处理的数据写入kafka,hbase kudu等外部系统。
UML关系自定义Sink需要实现父类的接口和继承抽象类。
上面是Sink的继承关系
// 方法需要SinkFunction的对象
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// configure the type if needed
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
}
SinkFunction
// SinkFunction是一个接口
public interface SinkFunction<IN> extends Function, Serializable {
//公共方法
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
}
RichSinkFunction
@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
}
其他继承接口SinkFunction的类:
案例自定义HbaseSink
public class HbaseSink extends RichSinkFunction<Tuple2<Integer, String>> {
Logger logger = LoggerFactory.getLogger(HbaseSink.class);
org.apache.hadoop.conf.Configuration configuration;
Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//获取hbase 的链接信息
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");
//创建conn
connection = ConnectionFactory.createConnection(configuration);
logger.info("创建链接成功");
}
@Override
public void invoke(Tuple2<Integer, String> value, Context context) throws Exception {
//往habse 里面插入数据
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Table table = connection.getTable(TableName.valueOf("torder_count"));
Put put = new Put(value.f1.getBytes(StandardCharsets.UTF_8));
put.addColumn("info".getBytes(), // 列族
"order_total".getBytes(StandardCharsets.UTF_8), //特征字段
value.f0.toString().getBytes()); //属性值
put.addColumn("info".getBytes(), "insert_time".getBytes(), format.format(new Date(System.currentTimeMillis())).getBytes());
table.put(put);
table.close();
logger.info("=====一条数据写入成功======,时间:"+value.f1+", 值:"+value.f0);
}
@Override
public void close() throws Exception {
super.close();
connection.close();
}
通过以上案例我们熟悉了addSink函数的操作。