flink1.12.0中flink-jdbc-connector死锁问题探究


问题表象

昨天,在给某flink任务添加一个自定义FlatMapFunction后,启动后看似任务正常,但未从kafka消费数据,10分钟之后,任务报错:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)
    at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyGlobalFailure(UpdateSchedulerNgOnInternalFailuresListener.java:65)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.failGlobal(ExecutionGraph.java:1055)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph$1.lambda$failJob$0(ExecutionGraph.java:477)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
    at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

继续排查详细日志,找到其中关键日志:

2022-11-23 11:48:19.045 [flink-akka.actor.default-dispatcher-18] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state CANCELED to JobManager for task Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, BaselineWindowProcess) -> Sink: 1601-bba4ee07151842ea89b58983f1f8dbf3 (1/1)#0 3b89c9f4a71976cbc009fd5991818c85.
2022-11-23 11:48:49.025 [Canceler/Interrupts for Co-Process-Broadcast -> (Timestamps/Watermarks, Filter -> Map -> Sink: flink_jdbc_sink_602) (1/1)#0 (c2a688d40f2a98ace6ac421f2410b0b5).] WARN  org.apache.flink.runtime.taskmanager.Task - Task 'Co-Process-Broadcast -> (Timestamps/Watermarks, Filter -> Map -> Sink: flink_jdbc_sink_602) (1/1)#0' did not react to cancelling signal for 30 seconds, but is stuck in method:
 com.mysql.cj.jdbc.Driver.<clinit>(Driver.java:55)
java.lang.Class.forName0(Native Method) 
java.lang.Class.forName(Class.java:264)
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:114)
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:50)
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$230/1971941943.run(Unknown Source)
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
java.lang.Thread.run(Thread.java:748)

2022-11-23 11:48:49.025 [Canceler/Interrupts for Source: broadcastStream (1/1)#0 (c563710ba5512bf3f221e6271946ed88).] WARN  org.apache.flink.runtime.taskmanager.Task - Task 'Source: broadcastStream (1/1)#0' did not react to cancelling signal for 30 seconds, but is stuck in method:
 sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:618)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:552)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
java.lang.Thread.run(Thread.java:748)

2022-11-23 11:48:49.025 [Canceler/Interrupts for Source: flink_kafka_source_101 -> Map -> Flat Map (1/1)#0 (725449cee2c58af1f0f01d26339effdc).] WARN  org.apache.flink.runtime.taskmanager.Task - Task 'Source: flink_kafka_source_101 -> Map -> Flat Map (1/1)#0' did not react to cancelling signal for 30 seconds, but is stuck in method:
 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
java.lang.Class.newInstance(Class.java:442)
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
java.util.ServiceLoader$1.next(ServiceLoader.java:480)
java.sql.DriverManager$2.run(DriverManager.java:603)
java.sql.DriverManager$2.run(DriverManager.java:583)
java.security.AccessController.doPrivileged(Native Method)
java.sql.DriverManager.loadInitialDrivers(DriverManager.java:583)
java.sql.DriverManager.<clinit>(DriverManager.java:101)
org.apache.commons.dbcp.BasicDataSource.<clinit>(BasicDataSource.java:57)
com.xxx.flinktask.util.JDBCUtils.<clinit>(JDBCUtils.java:18)
com.xxx.remo.flink.job.common.function.EtlDataSource.getAssetWhiteNew(EtlDataSource.java:54)
com.xxx.flinktask.task.core.function.EtlFlatMapFunction.open(EtlFlatMapFunction.java:77)
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$230/1971941943.run(Unknown Source)
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
java.lang.Thread.run(Thread.java:748)

问题排查

首先,flink任务的报错:Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. 是checkpoint失败次数过多导致,此报错本身并不具备信息量,chekpoint失败是由更深层次的问题导致的,

那么,从两个角度切入着手排查问题,

1.存在问题的自定义FlatMapFunction中的代码

该代码的业务逻辑上不应存在问题,因为有别的任务在使用与之相似的业务逻辑,关键代码完全一致。
那么就是这个FlatMapFunction和当前任务”水火不相容“? :)
既然逻辑上暂时看不出来问题根源所在,那就先切换另一个角度。


2.报错日志

仔细观察日志,发现端倪,报错显示程序stuck在JDBCUtils.java:18导致的调用栈里,该JDBCUtils是对commons-dbcpBasicDataSourcejdbc连接池做了简单封装的工具类,用于连接mysql。那么会不会是这个连接池本身的bug导致的程序“卡住”了呢?

java.sql.DriverManager.<clinit>(DriverManager.java:101)
org.apache.commons.dbcp.BasicDataSource.<clinit>(BasicDataSource.java:57)
com.xxx.flinktask.util.JDBCUtils.<clinit>(JDBCUtils.java:18)

查阅一番,找到资料:

https://issues.apache.org/jira/browse/DBCP-272

发现dbcp修复了一个DriverManager死锁问题, 修复方式为:

public class BasicDataSource implements DataSource {

    static {
        // Attempt to prevent deadlocks - see DBCP - 272
        DriverManager.getDrivers();

    }

    // 。。。。。
}

看来不像是commons-dbcp的问题?等等,“prevent deadlocks” ??


恍然大悟,再次仔细查看flink任务的报错日志,注意到如下日志:

org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)

于是,产生了合理怀疑:该方法可能因为使用Class.forName从而触发了DriverManager死锁。


一开始并没有注意到此调用栈,因为这个是flink-connector-jdbc_2.11、也就是官方connector包内的代码,先入为主的认为这个调用栈是ok的,问题是我们自己的代码导致的,而非这个官方包…


继续查阅资料,以佐证这个“合理怀疑”,发现了一个flink社区的issue:

[FLINK-19435] Deadlock when loading different driver classes concurrently using Class.forName - ASF JIRA

至此,确认了这个“合理怀疑”。

总结

公司使用的flink版本为1.12.0。

在flink 1.12.0的flink-connector-jdbc中,SimpleJdbcConnectionProvider等类,使用了Class.forName


当DriverManager没有事先初始化,且两个不同驱动类并发的使用Class.forName加载时,DriverManager的static块和特定驱动类的static块之间会产生死锁!


在我遇到的这个问题中,就是flink-connector-jdbc与mysql的驱动类(使用commons-dbcp)一同加载时,DriverManager没有做初始化,从而导致了死锁,程序整体卡住(自然也就无法消费kafka数据了)。


预防此死锁的方法就是,存在调用Class.forName方法加载Driver的类,在static块中调用

static {
    // Attempt to prevent deadlocks - see DBCP - 272
    DriverManager.getDrivers();
}

就像dbcp那样,而flink也在1.12.1版本如是修复了此死锁bug:

[FLINK-19435][connectors/jdbc] Fix deadlock when loading different sq… · apache/flink@84ed653 · GitHub

解决方法

最终解决方法:

  • 升级flink版本到1.12.1及以上
    or
  • 不使用flink-connector-jdbc,自己实现jdbc的sink/source

引申

  • 在jdk9以后,该死锁问题将不会发生,因为DriverManager的初始化操作已经从static块中移除;
  • 对任何代码都要抱有探究和质疑的态度,哪怕是一个成熟度很高的开源项目,也可能存在各种问题,且离我们并不遥远;


文章作者: lcok
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 lcok !
打赏
  目录