canal同步数据至ElasticSearch
简介
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
特点
-
支持所有平台;
-
支持由Prometheus支持的细粒度系统监视;
-
支持通过不同方式(例如通过GTID)解析和订阅MySQL Binlog;
-
支持高性能,实时数据同步;
-
Canal Server和Canal Client均支持由Apache ZooKeeper支持的HA /可伸缩性;
-
Docker支持;
canal实现原理
1、MySQL主从复制原理
- 主服务器MySQL服务将所有的写操作记录在 binlog 日志中,并生成 log dump 线程,将 binlog 日志传给从服务器MySQL服务的 I/O 线程;
- 从服务器MySQL服务生成两个线程,一个是 I/O 线程,另一个是 SQL 线程;
- 从库 I/O 线程去请求主库的 binlog 日志,并将 binlog 日志中的文件写入 relaylog(中继日志)中;
- 从库的 SQL 线程会读取 relaylog 中的内容,并解析成具体的操作,来实现主从的操作一致,达到最终两个数据库数据一致的目的;
2、canal实现原理
- Canal通过模拟成MySQL Slave的交互协议,表面上伪装成MySQL Slave,向MySQL Master发送dump协议;
- MySQL Master收到dump请求,开始推送 binlog 给 Slave即Canal;
- Canal解析 binlog 对象(原始为byte流);
3、canal部署架构
canal的部署时分为Server端和Client端。
- Server端的项目名为
canal.deployer
,server端部署好以后,可以直接监听mysql binlog,因为server端是把自己模拟成了mysql slave,所以,只能接受数据,没有进行任何逻辑的处理,具体的逻辑处理,需要client端进行处理; - Client端的项目名为
canal.client-adapter
,client可以自动开发或者使用官方提供的canal-adapter,Adapter是可以将canal server端获取的数据转换成几个常用的中间件数据源,现在支持kafka、rocketmq、hbase、elasticsearch,针对这几个中间件的支持,直接配置即可,无需开发;
4、canal后台管理
从canal1.1.4版本以后,阿里巴巴推出了canal的WEB管理UI,版本必须要>=canal1.1.4以上才可以部署,canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。
部署MySQL
1、部署MySQL
2、开启binlog日志
修改/etc/my.cnf或者my.ini文件增加以下配置,然后重启mysql
服务:
1
2
3
4
5
6
#开启binlog
log-bin=mysql-bin
#选择ROW模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1
3、创建授权
1
2
3
4
5
6
7
8
mysql> create user canal identified by 'canal'; #创建canal账户
Query OK, 0 rows affected (0.08 sec)
mysql> grant select,replication slave,replication client on *.* to 'canal'@'%'; #授权canal账户查询和复制权限
Query OK, 0 rows affected (0.02 sec)
mysql> flush privileges; #刷新授权
Query OK, 0 rows affected (0.00 sec)
4、查看binlog是否正确启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
mysql> show variables like 'log_bin%';
+---------------------------------+---------------------------------------+
| Variable_name | Value |
+---------------------------------+---------------------------------------+
| log_bin | ON |
| log_bin_basename | /usr/local/mysql/data/mysql-bin |
| log_bin_index | /usr/local/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
+---------------------------------+---------------------------------------+
5 rows in set (0.00 sec)
mysql> show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
部署ElasticSearch
部署canal.deployer服务端
1、下载并解压
1
2
3
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz
mkdir /usr/local/canal/deployer
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal/deployer/
2、修改deployer配置
conf文件夹中修改配置,Cannal默认带有一个实例example,修改对应实例配置信息.如果需要创建多个实例,创建新实例文件夹和配置信息即可。
修改/usr/local/canal/deployer/conf/example中的instance.properties配置文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=1234 #修改ID,不能和MySQL数据库一致
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=10.222.25.1:3306 #指定mysql数据库地址及端口
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://10.222.25.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=root
#canal.instance.tsdb.dbPassword=
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root #指定账号
canal.instance.dbPassword=root #指定密码
canal.instance.connectionCharset = UTF-8 #字符集格式,需要与mysql保持一致
canal.instance.defaultDatabaseName=tha_overseas_business_man_dev #要同步的数据库
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.* #表名监控的正则
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
3、启动deployer
进入/usr/local/canal/deployer/bin目录,执行startup.sh脚本。
1
[root@YZ-222-25-1 bin]# ./startup.sh
4、查看日志及端口
查看端口
1
2
3
4
[root@YZ-222-25-1 bin]# ss -anplt |grep java
LISTEN 0 50 *:11110 *:* users:(("java",pid=22328,fd=91))
LISTEN 0 50 *:11111 *:* users:(("java",pid=22328,fd=87))
LISTEN 0 3 *:11112 *:* users:(("java",pid=22328,fd=70))
canal-deployer默认监听三个端口,11110
、11111
、11112
- 11110:为admin管理端口;
- 11111:为canal deployer 服务器占用的端口 ;
- 11112:为指标下拉端口;
查看deployer日志
进入/usr/local/canal/deployer/logs/canal目录,查看canal.log。
1
2
3
4
5
6
[root@YZ-222-25-1 canal]# tail -f canal.log
2020-11-04 19:27:35.404 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2020-11-04 19:27:35.473 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2020-11-04 19:27:35.494 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-11-04 19:27:35.583 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.222.25.1(10.222.25.1):11111]
2020-11-04 19:27:36.991 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
查看example日志
进入/usr/local/canal/deployer/logs/example目录,查看example.log。
1
2
3
4
5
6
7
8
9
10
11
[root@YZ-222-25-1 example]# tail -f example.log
2020-11-04 19:27:36.278 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-11-04 19:27:36.285 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-11-04 19:27:36.651 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2020-11-04 19:27:36.667 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-11-04 19:27:36.667 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2020-11-04 19:27:36.797 [destination = example , address = /10.222.25.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-11-04 19:27:36.908 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-11-04 19:27:36.954 [destination = example , address = /10.222.25.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
{"identity":{"slaveId":-1,"sourceAddress":{"address":"YZ-222-25-1.h.abchost.local","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":1965,"serverId":1,"timestamp":1604473186000}}
2020-11-04 19:27:36.994 [destination = example , address = /10.222.25.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=1965,serverId=1,gtid=,timestamp=1604473186000] cost : 157ms , the next step is binlog dump
部署canal.adapter
下载
1
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
修改conf/application.yml
修改/usr/local/canal/adapter/conf目录下的application.yml配置文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 127.0.0.1:11111
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
username:
password:
vhost:
## 可以配置多数据源
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/tha_overseas_business_man_dev?useUnicode=true
username: root
password: root
## 可以配置多个实例,实例名对应canal.deployer中配置的实例信息和配置
canalAdapters:
- instance: example # canal instance Name or mq topic name
## 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和es
groups:
- groupId: g1
outerAdapters:
- name: logger
## es版本
- name: es7
hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
properties:
mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
修改映射文件
进入/usr/local/canal/adapter/conf/es7目录,新增或修改映射文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
dataSourceKey: defaultDS #adapter配置文件中的数据库源名称
destination: example #cannal instance
groupId: g1 #adapter配置文件中分组名
esMapping:
_index: store #索引名
_id: id
upsert: true
# pk: id
sql: "SELECT t.id id,
t.store_code storeCode,
t.merchant_id merchantId,
t.store_name storeName,
t.short_name shortName,
t.business_phone businessPhone,
t.STATUS status,
t.delete_status deleteStatus,
t.category category,
t.channel channel,
t.opening_hour openingHour,
t.closing_hour closingHour,
concat(IFNULL(t.business_lat, 0), ',', IFNULL(t.business_lng, 0)) as geoPoint,
t.business_lng businessLng,
t.business_lat businessLat,
t.food_ordering foodOrdering,
t.self_pickup_remark selfPickupRemark,
t.payment_flag paymentFlag,
t.self_pickup_effect_time selfPickupEffectTime
FROM
store t"
# objFields:
# _labels: array:;
etlCondition: "where t.created_date>={}"
commitBatch: 3000
在ES中创建索引
打开Kibana界面,找到左侧的“开发工具”,然后进行创建索引及指定Mapping。
启动adapter
进入/usr/local/canal/adapter/bin目录,启动服务并查看日志。
1
2
3
4
5
6
7
[root@YZ-222-25-1 adapter]# tail -f adapter.log
2020-11-04 19:52:31.323 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2020-11-04 19:52:31.325 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2020-11-04 19:52:31.428 [Thread-4] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - =============> Start to subscribe destination: example <=============
2020-11-04 19:52:31.431 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2020-11-04 19:52:31.436 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 12.191 seconds (JVM running for 13.316)
2020-11-04 19:52:31.471 [Thread-4] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - =============> Subscribe destination: example succeed <=============
本地调试问题
本地调试canal.adapter时出现com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2020-11-03 19:22:17.384 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [classes/:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [classes/:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_231]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_231]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_231]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_231]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:353) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1089) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:126) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:117) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_231]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_231]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_231]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_231]
at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:264) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:182) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:144) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:400) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:354) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:888) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[classes/:na]
Caused by: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:83) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:52) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
... 42 common frames omitted
Caused by: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.addSyncConfigToCache(ESAdapter.java:146) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:75) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
... 43 common frames omitted
问题原因是放入Map中的DruidDataSource与获取这个数据源的类加载器不是同一个。主要是client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar包含druid的包,导致加载的时候AppClassLoader类加载器加载了一遍druid,然后自定义的URLClassExtensionLoader又从client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar这个包里面加载了一遍druid,导致,他俩的DruidDataSource的类加载器不一样,导致无法cast。
1
2
3
4
5
6
7
8
9
com.alibaba.otter.canal.client.adapter.es.core.ESAdapter类的
146行:
DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
DruidDataSource.class.getClassLoader()是:
com.alibaba.otter.canal.client.adapter.support.URLClassExtensionLoader
DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey()).getClass().getClassLoader()是:
sun.misc.Launcher.AppClassLoader
解决方案
client-adapter.escore的pom.xml的关于druid的配置,需要设置scope为provider。
1
2
3
4
5
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<scope>provided</scope>//加上这行配置,让es的xxxx-with-dependency.jar不包含druid相关包
</dependency>