I'll post a reproducible demo code first.
import time
import sqlalchemy as sql
from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class TestData(db.Model):
id = sql.Column(sql.BigInteger, primary_key=True)
def create_app() -> Flask:
app: Flask = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://uri"
db.init_app(app)
return app
app = create_app()
def test():
id_ = int(time.time() * 1000)
print(f"{id_} begin")
with app.app_context():
q= TestData.query.first()
print(q)
print(f"{id_} end")
if __name__ == "__main__":
executors = {
'default': ThreadPoolExecutor(5),
'process1': ProcessPoolExecutor(1),
}
scheduler = BlockingScheduler(executors=executors)
scheduler.add_job(
func=test,
id="task1",
max_instances=1,
executor='default',
)
scheduler.add_job(
func=test,
# trigger=IntervalTrigger(seconds=5),
id="task2",
max_instances=1,
executor='process1',
)
scheduler.start()
Corresponding to the above demo, you will get the following output
1720411054366 begin
1720411054376 begin
<TestData 1>
1720411054366 end
The query of 1720411054376 seems to be hung and will not end.
I tested that it will only hang when the following conditions are met
- task1 executor uses ThreadPoolExecutor and task2 uses ProcessPoolExecutor. It means that if they both use ThreadPoolExecutor or ProcessPoolExecutor, there will be no problem.
- They must be started at the same time. If I add a
trigger=IntervalTrigger(seconds=5)
to task1/task2, there will be no problem.
I want to know what causes this problem? How can I solve it?
package version
Flask_SQLAlchemy==2.4.4
Flask==1.1.2
APScheduler 3.10.1
UPDATE:
I used py-spy to view the call stack of the line
get_engine (flask_sqlalchemy/__init__.py:955)
engine (flask_sqlalchemy/__init__.py:943)
__init__ (flask_sqlalchemy/__init__.py:138)
__call__ (sqlalchemy/orm/session.py:3309)
__call__ (sqlalchemy/util/_collections.py:1022)
__call__ (sqlalchemy/orm/scoping.py:78)
__get__ (flask_sqlalchemy/__init__.py:514)
test (test_schduler.py:33)
def get_engine(self, app=None, bind=None):
"""Returns a specific engine."""
app = self.get_app(app)
state = get_state(app)
with self._engine_lock:
connector = state.connectors.get(bind)
if connector is None:
connector = self.make_connector(app, bind)
state.connectors[bind] = connector
return connector.get_engine()
It seems that there is a problem with the initialization of _engine_lock in multi-process. I don't know how to solve this problem yet.