Flink未来将与 Pulsar集成提供大规模的弹性数据处理
未来整合
Pulsar可以以不同的方式与Apache Flink集成。一些潜在的集成包括使用流式连接器为流式工作负载提供支持,并使用批量源连接器支持批量工作负载。Pulsar还提供对schema 的本地支持,可以与Flink集成并提供对数据的结构化访问,例如使用Flink SQL作为在Pulsar中查询数据的方式。最后,集成这些技术的另一种方法可能包括使用Pulsar作为Flink的状态后端。由于Pulsar具有分层架构(Streams和Segmented Streams,由Apache Bookkeeper提供支持),因此将Pulsar用作存储层并存储Flink状态变得很自然。
从体系结构的角度来看,我们可以想象两个框架之间的集成,它使用Apache Pulsar作为统一的数据层视图,Apache Flink作为统一的计算和数据处理框架和API。
现有集成
两个框架之间的集成正在进行中,开发人员已经可以通过多种方式将Pulsar与Flink结合使用。例如,Pulsar可用作Flink DataStream应用程序中的流媒体源和流式接收器。开发人员可以将Pulsar中的数据提取到Flink作业中,该作业可以计算和处理实时数据,然后将数据作为流式接收器发送回Pulsar主题。这样的例子如下所示:
// create and configure Pulsar consumer
PulsarSourceBuilder<String>builder = PulsarSourceBuilder
.builder(new SimpleStringSchema())
.serviceUrl(serviceUrl)
.topic(inputTopic)
.subscriptionName(subscription);
SourceFunction<String> src = builder.build();
// ingest DataStream with Pulsar consumer
DataStream<String> words = env.addSource(src);
// perform computation on DataStream (here a simple WordCount)
DataStream<WordWithCount> wc = words
.flatMap((FlatMapFunction<String, WordWithCount>) (word, collector) -> {
collector.collect(new WordWithCount(word, 1));
})
.returns(WordWithCount.class)
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
new WordWithCount(c1.word, c1.count + c2.count));
// emit result via Pulsar producer
wc.addSink(new FlinkPulsarProducer<>(
serviceUrl,
outputTopic,
new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
wordWithCount -> wordWithCount.word)
);
开发人员可以利用的两个框架之间的另一个集成包括将Pulsar用作Flink SQL或Table API查询的流式源和流式表接收器,如下例所示:
// obtain a DataStream with words
DataStream<String> words = ...
// register DataStream as Table "words" with two attributes ("word", "ts").
// "ts" is an event-time timestamp.
tableEnvironment.registerDataStream("words", words, "word, ts.rowtime");
// create a TableSink that produces to Pulsar
TableSink sink = new PulsarJsonTableSink(
serviceUrl,
outputTopic,
new AuthenticationDisabled(),
ROUTING_KEY);
// register Pulsar TableSink as table "wc"
tableEnvironment.registerTableSink(
"wc",
sink.configure(
new String[]{"word", "cnt"},
new TypeInformation[]{Types.STRING, Types.LONG}));
// count words per 5 seconds and write result to table "wc"
tableEnvironment.sqlUpdate(
"INSERT INTO wc " +
"SELECT word, COUNT(*) AS cnt " +
"FROM words " +
"GROUP BY word, TUMBLE(ts, INTERVAL '5' SECOND)");
最后,Flink将批量工作负载与Pulsar集成为批处理接收器,其中所有结果在Apache Flink完成静态数据集中的计算后被推送到Pulsar。这样的例子如下所示:
// obtain DataSet from arbitrary computation
DataSet<WordWithCount> wc = ...
// create PulsarOutputFormat instance
OutputFormat pulsarOutputFormat = new PulsarOutputFormat(
serviceUrl,
topic,
new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes());
// write DataSet to Pulsar
wc.output(pulsarOutputFormat);
结论
Pulsar和Flink都对应用程序的数据和计算级别如何以批量作为特殊情况流“流式传输”方式分享了类似的观点。通过Pulsar的Segmented Streams方法和Flink在一个框架下统一批处理和流处理工作负载的步骤,有许多方法将这两种技术集成在一起,以提供大规模的弹性数据处理。
最新活动更多
-
10 阿里AI需要算一笔账了
- 1 GPT-6要来了,但AI行业早不跟 OpenAI玩了
- 2 火爆的“Token经济学”,关乎你的钱包、职场和未来消费 | 人人能懂的产业报告
- 3 资本巨头纷纷抽身,为何中小投资者仍为AI狂热加码?
- 4 大厂财报中的AI图鉴:营收单列、玩杠杆、商业画饼
- 5 从百度到Meta,科技巨头的 AI 组织战,开打了
- 6 2026年3月,国内具身智能机器人企业融资汇总
- 7 华勤财报发布:收入规模破1700亿,利润增长近40%
- 8 宇树科技招股书透视:中外具身智能玩家生存竞速
- 9 大涨30%!智谱 AI 财报出炉:营收暴增132%,API 增长3倍,市值破 4000 亿
- 10 谷歌Gemma 4遭破解!实测:伪造支票、找盗版电影,有求必应


分享














发表评论
登录
手机
验证码
手机/邮箱/用户名
密码
立即登录即可访问所有OFweek服务
还不是会员?免费注册
忘记密码其他方式
请输入评论内容...
请输入评论/评论长度6~500个字
暂无评论
暂无评论