Camel

功能

【功能】Camel框架的核心是一个路由引擎构建器,允许用户定义自己的路由规则,决定从哪个源接收消息,并确定如何处理这些消息并将其发送到其他目标。

  1. 分发到多个目的地

  2. 对消息包装

  3. 消息去重

【使用建议】Beans是Camel世界的一等公民,用户最好使用Beans,这样用户可以继承Camel内建的功能并自定义。
【语法支持】支持Java、XML、Scala、Groovy。
【消息格式】Camel没有规定消息格式,不管系统使用的协议或数据类型如何,都可以使用相同的API与各种系统进行交互;Camel内置了很多类型转换器,并且可以自动转换,Camel支持80多种协议和数据类型。
【扩展组件】Camel的架构设计就采用了模块化和组件化的思想,因此添加第三方扩展组件以及自定义组件都很容易,此外Camel的组件库也很丰富。

什么是路由?

路由就是将消息从输入队列中取出,并根据一组预设的条件,发送到多个输出队列中的过程。

由于路由的存在,输入和输出队列并不知道消息传递的条件,是故,输入消息传递的逻辑与消息的生产者和消费者之间是解耦的。

graph LR

A(输入队列) --> B[消息路由] --> C(输出队列1)
B --> C1(输出队列2)
B --> C2(输出队列3)
B --> Cn(输出队列n)

style B fill:greenyellow

什么是JMS?

功能示例

端点URI:可以标识出①想要使用的组件,比如file组件,和②对该组件的相关配置,③可以决定将该组件作为消息生产者,还是作为消息消费者。

1
from("file://inputdir/?delete=true").to("file://outputdir")

file端点:监听目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class FileCopierWithCamel {

public static void main(String args[]) throws Exception {
// create CamelContext
CamelContext context = new DefaultCamelContext();

// add our route to the CamelContext
context.addRoutes(new RouteBuilder() {
public void configure() {
/**
file: 表示使用文件Component
from 表示从哪里获取数据,进行消费
to 表示将数据生产到哪里
*/
from("file:data/inbox?noop=true").to("file:data/outbox");

// 如果有文件删除,就复制一份到目标目录
from("file://inputdir/?delete=true").to("file://outputdir")
}
});

// start the route and let it do its work
context.start();
Thread.sleep(10000);

// stop the CamelContext
context.stop();
}
}

FTP组件

FTP组件并不在camel-core模块内,所以添加一个额外的依赖。

1
2
3
4
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ftp</artifactId>
</dependency>

从FTP上下载数据:

1
from("ftp://rider.com/orders?username=rider&password=secret")

发送到JMS:

队列消息转发

1
2
3
4
5
6
7
// 从一个消息队列分发到另外三个消息队列
from("jms:queue:" + Destination.TASK_FINISH_PUBLISH)
.to(
"jms:queue:" + Destination.CONSUMER_TASK_TASK_FINISH,
"jms:queue:" + Destination.CONSUMER_GROWTH_TASK_FINISH,
"jms:queue:" + Destination.CONSUMER_APPINFO_TASK_FINISH
);

消息去重

幂等消息消费者

服务器 => 客户端, 发送了重复的消息
客户端针对同一消息,不论收到几次,都只处理一次。

那么这个客户端就是幂等消息消费者

幂等消息消费者(Idempotent Consumer)对于重复消息的处理应该有幂等的结果。

IdempotentConsumer类

针对幂等消费者的问题,Apache Camel提供了IdempotentConsumer类作为解决方案。

IdempotentConsumer类的构造方法包括:

  1. messageIdExpression:一个消息ID表达式。唯一,用来标识消息身份。
  2. idempotentRepository:一个用来存储消息ID的存储方式(redis,jpa等)。指定如何存储这些消息ID。
    • MemoryIdempotentRepository 基于内存
    • RedisStringIdempotentRepository 基于Redis
    • JpaMessageIdRepository 基于Jpa
    • JdbcMessageIdRepository 基于Jdbc

工作原理:

  1. 如果消息ID在存储IdempotentRepository中不存在,则把消息存储到IdempotentRepository中,并且处理消息。如果消息处理失败,再从IdempotentRepository移除。
  2. 如果消息ID已经存在,则默认IdempotentCusumer会丢弃重复消息,当然Camel也允许我们自定义如何处理重复消息。

IdempotentConsumer类的构造方法:

1
2
3
4
5
public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) {
IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(messageIdExpression, idempotentRepository);
addOutput(answer);
return answer;
}

RedisStringIdempotentRepository去重示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Camel路由配置
*/
@Configuration
public class CamelConfiguration {
@Resource
private RedisTemplate redisTemplate;
@Bean
RouteBuilder myRouter() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// 使用idempotent和camel-redis组件进行去重
// 设置处理进程名为camel-idem
RedisStringIdempotentRepository idempotentRepo = new RedisStringIdempotentRepository(redisTemplate, "camel-idem");
// 设置超时为20秒
idempotentRepo.setExpiry(20);
// 根据taskId和tdrId去重
// 将unique头设置为FINISH+taskId的值+tdrId的值
// 将unique头作为Message ID进行去重
from("jms:queue:" + Destination.TASK_FINISH_PUBLISH)
.setHeader("unique")
.simple("FINISH${body[taskId]}${body[tdrId]}")
.idempotentConsumer(header("unique"), idempotentRepo)
.to(
"jms:queue:" + Destination.CONSUMER_TASK_TASK_FINISH,
"jms:queue:" + Destination.CONSUMER_GROWTH_TASK_FINISH,
"jms:queue:" + Destination.CONSUMER_APPINFO_TASK_FINISH
);
// 根据taskId去重
from("jms:queue:" + Destination.TASK_UNDELIVER_PUBLISH)
.setHeader("unique")
.simple("UNDELIVER${body[taskId]}")
.idempotentConsumer(header("unique"), idempotentRepo)
.to(
"jms:queue:" + Destination.CONSUMER_TASK_TASK_UNDELIVER,
"jms:queue:" + Destination.CONSUMER_GROWTH_TASK_UNDELIVER
);
}
};
}
}
-------------Keep It Simple Stupid-------------
0%