ElasticSearch-01丨canal同步数据至ES

Posted by jiefang on November 26, 2020

canal同步数据至ElasticSearch

简介

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

特点

  • 支持所有平台;

  • 支持由Prometheus支持的细粒度系统监视;

  • 支持通过不同方式(例如通过GTID)解析和订阅MySQL Binlog;

  • 支持高性能,实时数据同步;

  • Canal Server和Canal Client均支持由Apache ZooKeeper支持的HA /可伸缩性;

  • Docker支持;

canal实现原理

1、MySQL主从复制原理

MySQL主从

  • 主服务器MySQL服务将所有的写操作记录在 binlog 日志中,并生成 log dump 线程,将 binlog 日志传给从服务器MySQL服务的 I/O 线程;
  • 从服务器MySQL服务生成两个线程,一个是 I/O 线程,另一个是 SQL 线程;
  • 从库 I/O 线程去请求主库的 binlog 日志,并将 binlog 日志中的文件写入 relaylog(中继日志)中;
  • 从库的 SQL 线程会读取 relaylog 中的内容,并解析成具体的操作,来实现主从的操作一致,达到最终两个数据库数据一致的目的;

2、canal实现原理

canal实现原理

  • Canal通过模拟成MySQL Slave的交互协议,表面上伪装成MySQL Slave,向MySQL Master发送dump协议;
  • MySQL Master收到dump请求,开始推送 binlog 给 Slave即Canal;
  • Canal解析 binlog 对象(原始为byte流);

3、canal部署架构

canal的部署时分为Server端和Client端。

  • Server端的项目名为canal.deployer,server端部署好以后,可以直接监听mysql binlog,因为server端是把自己模拟成了mysql slave,所以,只能接受数据,没有进行任何逻辑的处理,具体的逻辑处理,需要client端进行处理;
  • Client端的项目名为canal.client-adapter,client可以自动开发或者使用官方提供的canal-adapter,Adapter是可以将canal server端获取的数据转换成几个常用的中间件数据源,现在支持kafka、rocketmq、hbase、elasticsearch,针对这几个中间件的支持,直接配置即可,无需开发;

4、canal后台管理

从canal1.1.4版本以后,阿里巴巴推出了canal的WEB管理UI,版本必须要>=canal1.1.4以上才可以部署,canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

部署MySQL

1、部署MySQL

2、开启binlog日志

修改/etc/my.cnf或者my.ini文件增加以下配置,然后重启mysql服务:

1
2
3
4
5
6
#开启binlog
log-bin=mysql-bin
#选择ROW模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1

3、创建授权

1
2
3
4
5
6
7
8
mysql> create user canal identified by 'canal';                           #创建canal账户
Query OK, 0 rows affected (0.08 sec)

mysql> grant select,replication slave,replication client on *.* to 'canal'@'%'; #授权canal账户查询和复制权限
Query OK, 0 rows affected (0.02 sec)

mysql> flush privileges;                                                        #刷新授权
Query OK, 0 rows affected (0.00 sec)

4、查看binlog是否正确启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
mysql>  show variables like 'log_bin%';
+---------------------------------+---------------------------------------+
| Variable_name                   | Value                                 |
+---------------------------------+---------------------------------------+
| log_bin                         | ON                                    |
| log_bin_basename                | /usr/local/mysql/data/mysql-bin       |
| log_bin_index                   | /usr/local/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF                                   |
| log_bin_use_v1_row_events       | OFF                                   |
+---------------------------------+---------------------------------------+
5 rows in set (0.00 sec)
mysql> show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)

部署ElasticSearch

部署canal.deployer服务端

1、下载并解压

1
2
3
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz
mkdir /usr/local/canal/deployer
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal/deployer/

2、修改deployer配置

conf文件夹中修改配置,Cannal默认带有一个实例example,修改对应实例配置信息.如果需要创建多个实例,创建新实例文件夹和配置信息即可。

修改/usr/local/canal/deployer/conf/example中的instance.properties配置文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=1234                             #修改ID,不能和MySQL数据库一致

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=10.222.25.1:3306                #指定mysql数据库地址及端口
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://10.222.25.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=root
#canal.instance.tsdb.dbPassword=

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root                                    #指定账号
canal.instance.dbPassword=root                                    #指定密码
canal.instance.connectionCharset = UTF-8                          #字符集格式,需要与mysql保持一致
canal.instance.defaultDatabaseName=tha_overseas_business_man_dev  #要同步的数据库
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*                #表名监控的正则
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

3、启动deployer

进入/usr/local/canal/deployer/bin目录,执行startup.sh脚本。

1
[root@YZ-222-25-1 bin]# ./startup.sh 

4、查看日志及端口

查看端口

1
2
3
4
[root@YZ-222-25-1 bin]# ss -anplt |grep java
LISTEN     0      50           *:11110                    *:*                   users:(("java",pid=22328,fd=91))
LISTEN     0      50           *:11111                    *:*                   users:(("java",pid=22328,fd=87))
LISTEN     0      3            *:11112                    *:*                   users:(("java",pid=22328,fd=70))

canal-deployer默认监听三个端口,111101111111112

  • 11110:为admin管理端口;
  • 11111:为canal deployer 服务器占用的端口 ;
  • 11112:为指标下拉端口;

查看deployer日志

进入/usr/local/canal/deployer/logs/canal目录,查看canal.log。

1
2
3
4
5
6
[root@YZ-222-25-1 canal]# tail -f canal.log
2020-11-04 19:27:35.404 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2020-11-04 19:27:35.473 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2020-11-04 19:27:35.494 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-11-04 19:27:35.583 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.222.25.1(10.222.25.1):11111]
2020-11-04 19:27:36.991 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

查看example日志

进入/usr/local/canal/deployer/logs/example目录,查看example.log。

1
2
3
4
5
6
7
8
9
10
11
[root@YZ-222-25-1 example]# tail -f example.log
2020-11-04 19:27:36.278 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-11-04 19:27:36.285 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-11-04 19:27:36.651 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2020-11-04 19:27:36.667 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-11-04 19:27:36.667 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2020-11-04 19:27:36.797 [destination = example , address = /10.222.25.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-11-04 19:27:36.908 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-11-04 19:27:36.954 [destination = example , address = /10.222.25.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"YZ-222-25-1.h.abchost.local","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":1965,"serverId":1,"timestamp":1604473186000}}
2020-11-04 19:27:36.994 [destination = example , address = /10.222.25.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=1965,serverId=1,gtid=,timestamp=1604473186000] cost : 157ms , the next step is binlog dump

部署canal.adapter

下载

1
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz

修改conf/application.yml

修改/usr/local/canal/adapter/conf目录下的application.yml配置文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp # kafka rocketMQ
  canalServerHost: 127.0.0.1:11111
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  username:
  password:
  vhost:
  ## 可以配置多数据源
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/tha_overseas_business_man_dev?useUnicode=true
      username: root
      password: root
  ## 可以配置多个实例,实例名对应canal.deployer中配置的实例信息和配置
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
  	## 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和es
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      	## es版本
      - name: es7
        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
        properties:
          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch

修改映射文件

进入/usr/local/canal/adapter/conf/es7目录,新增或修改映射文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
dataSourceKey: defaultDS #adapter配置文件中的数据库源名称
destination: example #cannal instance
groupId: g1 #adapter配置文件中分组名
esMapping:
  _index: store #索引名
  _id: id
  upsert: true
#  pk: id
  sql: "SELECT t.id id,
          t.store_code storeCode,
          t.merchant_id merchantId,
          t.store_name storeName,
          t.short_name shortName,
          t.business_phone businessPhone,
          t.STATUS status,
          t.delete_status deleteStatus,
          t.category category,
          t.channel channel,
          t.opening_hour openingHour,
          t.closing_hour closingHour,
          concat(IFNULL(t.business_lat, 0), ',', IFNULL(t.business_lng, 0)) as geoPoint,
          t.business_lng businessLng,
          t.business_lat businessLat,
          t.food_ordering foodOrdering,
          t.self_pickup_remark selfPickupRemark,
          t.payment_flag paymentFlag,
          t.self_pickup_effect_time selfPickupEffectTime
      FROM
          store t"
#  objFields:
#    _labels: array:;
  etlCondition: "where t.created_date>={}"
  commitBatch: 3000

在ES中创建索引

打开Kibana界面,找到左侧的“开发工具”,然后进行创建索引及指定Mapping。

启动adapter

进入/usr/local/canal/adapter/bin目录,启动服务并查看日志。

1
2
3
4
5
6
7
[root@YZ-222-25-1 adapter]# tail -f adapter.log
2020-11-04 19:52:31.323 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2020-11-04 19:52:31.325 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2020-11-04 19:52:31.428 [Thread-4] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - =============> Start to subscribe destination: example <=============
2020-11-04 19:52:31.431 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2020-11-04 19:52:31.436 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 12.191 seconds (JVM running for 13.316)
2020-11-04 19:52:31.471 [Thread-4] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - =============> Subscribe destination: example succeed <=============

本地调试问题

本地调试canal.adapter时出现com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2020-11-03 19:22:17.384 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
	at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
	at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [classes/:na]
	at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [classes/:na]
	at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_231]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_231]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_231]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_231]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:353) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
	at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1089) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:126) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
	at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:117) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_231]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_231]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_231]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_231]
	at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:264) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:182) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:144) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:400) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:354) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:888) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[classes/:na]
Caused by: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
	at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:83) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
	at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:52) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
	... 42 common frames omitted
Caused by: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
	at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.addSyncConfigToCache(ESAdapter.java:146) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
	at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:75) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
	... 43 common frames omitted

问题原因是放入Map中的DruidDataSource与获取这个数据源的类加载器不是同一个。主要是client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar包含druid的包,导致加载的时候AppClassLoader类加载器加载了一遍druid,然后自定义的URLClassExtensionLoader又从client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar这个包里面加载了一遍druid,导致,他俩的DruidDataSource的类加载器不一样,导致无法cast。

1
2
3
4
5
6
7
8
9
com.alibaba.otter.canal.client.adapter.es.core.ESAdapter类的
146行:
DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());

DruidDataSource.class.getClassLoader()是:
com.alibaba.otter.canal.client.adapter.support.URLClassExtensionLoader

DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey()).getClass().getClassLoader()是:
sun.misc.Launcher.AppClassLoader

解决方案

client-adapter.escore的pom.xml的关于druid的配置,需要设置scope为provider。

1
2
3
4
5
<dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <scope>provided</scope>//加上这行配置,让es的xxxx-with-dependency.jar不包含druid相关包
</dependency>