0

I'll post a reproducible demo code first.

import time
import sqlalchemy as sql
from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from flask import Flask
from flask_sqlalchemy import SQLAlchemy


db = SQLAlchemy()


class TestData(db.Model):
    id = sql.Column(sql.BigInteger, primary_key=True)

def create_app() -> Flask:
    app: Flask = Flask(__name__)
    app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://uri"
    db.init_app(app)

    return app


app = create_app()

def test():
    id_ = int(time.time() * 1000)    
    print(f"{id_} begin")
    with app.app_context():
        q= TestData.query.first()
        print(q)
    print(f"{id_} end")



if __name__ == "__main__":
    executors = {
        'default': ThreadPoolExecutor(5),
        'process1': ProcessPoolExecutor(1),
    }
    scheduler = BlockingScheduler(executors=executors)
    scheduler.add_job(
        func=test,
        id="task1",
        max_instances=1,
        executor='default',
    )

    scheduler.add_job(
        func=test,
        # trigger=IntervalTrigger(seconds=5),
        id="task2",
        max_instances=1,
        executor='process1',
    )
    scheduler.start()

Corresponding to the above demo, you will get the following output

   1720411054366 begin
   1720411054376 begin
   <TestData 1>
   1720411054366 end

The query of 1720411054376 seems to be hung and will not end.

I tested that it will only hang when the following conditions are met

  1. task1 executor uses ThreadPoolExecutor and task2 uses ProcessPoolExecutor. It means that if they both use ThreadPoolExecutor or ProcessPoolExecutor, there will be no problem.
  2. They must be started at the same time. If I add a trigger=IntervalTrigger(seconds=5) to task1/task2, there will be no problem.

I want to know what causes this problem? How can I solve it?

package version

Flask_SQLAlchemy==2.4.4
Flask==1.1.2
APScheduler 3.10.1

UPDATE:

I used py-spy to view the call stack of the line

get_engine (flask_sqlalchemy/__init__.py:955)
engine (flask_sqlalchemy/__init__.py:943)
__init__ (flask_sqlalchemy/__init__.py:138)
__call__ (sqlalchemy/orm/session.py:3309)
__call__ (sqlalchemy/util/_collections.py:1022)
__call__ (sqlalchemy/orm/scoping.py:78)
__get__ (flask_sqlalchemy/__init__.py:514)
test (test_schduler.py:33)
    def get_engine(self, app=None, bind=None):
        """Returns a specific engine."""

        app = self.get_app(app)
        state = get_state(app)

        with self._engine_lock:
            connector = state.connectors.get(bind)

            if connector is None:
                connector = self.make_connector(app, bind)
                state.connectors[bind] = connector

            return connector.get_engine()

It seems that there is a problem with the initialization of _engine_lock in multi-process. I don't know how to solve this problem yet.

0

0

Browse other questions tagged or ask your own question.