功能
【功能】Camel框架的核心是一个路由引擎构建器,允许用户定义自己的路由规则,决定从哪个源接收消息,并确定如何处理这些消息并将其发送到其他目标。
分发到多个目的地
对消息包装
消息去重
【使用建议】Beans是Camel世界的一等公民,用户最好使用Beans,这样用户可以继承Camel内建的功能并自定义。
【语法支持】支持Java、XML、Scala、Groovy。
【消息格式】Camel没有规定消息格式,不管系统使用的协议或数据类型如何,都可以使用相同的API与各种系统进行交互;Camel内置了很多类型转换器,并且可以自动转换,Camel支持80多种协议和数据类型。
【扩展组件】Camel的架构设计就采用了模块化和组件化的思想,因此添加第三方扩展组件以及自定义组件都很容易,此外Camel的组件库也很丰富。
什么是路由?
路由就是将消息从输入队列中取出,并根据一组预设的条件,发送到多个输出队列中的过程。
由于路由的存在,输入和输出队列并不知道消息传递的条件,是故,输入消息传递的逻辑与消息的生产者和消费者之间是解耦的。
graph LR A(输入队列) --> B[消息路由] --> C(输出队列1) B --> C1(输出队列2) B --> C2(输出队列3) B --> Cn(输出队列n) style B fill:greenyellow
什么是JMS?
功能示例
端点URI:可以标识出①想要使用的组件,比如file组件,和②对该组件的相关配置,③可以决定将该组件作为消息生产者,还是作为消息消费者。
1 | from("file://inputdir/?delete=true").to("file://outputdir") |
file端点:监听目录
1 | public class FileCopierWithCamel { |
FTP组件
FTP组件并不在camel-core模块内,所以添加一个额外的依赖。
1 | <dependency> |
从FTP上下载数据:
1 | from("ftp://rider.com/orders?username=rider&password=secret") |
发送到JMS:
队列消息转发
1 | // 从一个消息队列分发到另外三个消息队列 |
消息去重
幂等消息消费者
服务器 => 客户端, 发送了重复的消息
客户端针对同一消息,不论收到几次,都只处理一次。
那么这个客户端就是幂等消息消费者。
幂等消息消费者(Idempotent Consumer)对于重复消息的处理应该有幂等的结果。
IdempotentConsumer类
针对幂等消费者的问题,Apache Camel提供了IdempotentConsumer类作为解决方案。
IdempotentConsumer类的构造方法包括:
- messageIdExpression:一个消息ID表达式。唯一,用来标识消息身份。
- idempotentRepository:一个用来存储消息ID的存储方式(redis,jpa等)。指定如何存储这些消息ID。
- MemoryIdempotentRepository 基于内存
- RedisStringIdempotentRepository 基于Redis
- JpaMessageIdRepository 基于Jpa
- JdbcMessageIdRepository 基于Jdbc
工作原理:
- 如果消息ID在存储IdempotentRepository中不存在,则把消息存储到IdempotentRepository中,并且处理消息。如果消息处理失败,再从IdempotentRepository移除。
- 如果消息ID已经存在,则默认IdempotentCusumer会丢弃重复消息,当然Camel也允许我们自定义如何处理重复消息。
IdempotentConsumer类的构造方法:
1 | public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) { |
RedisStringIdempotentRepository去重示例:
1 | /** |