![]() ![]() Query = db.select().where(dicom_ "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 736, in _executeįile "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 826, in _run_scheduler_loopįile "/home/airflow/.local/lib/python3.9/site-packages/airflow/executors/base_executor.py", line 171, in heartbeatįile "/home/airflow/.local/lib/python3.9/site-packages/airflow/executors/kubernetes_executor.py", line 613, in syncįile "/home/airflow/.local/lib/python3.9/site-packages/airflow/executors/kubernetes_executor.py", line 300, in run_next Table_name = db.Table('table_name', metadata, autoload=True, autoload_with=engine) 'retry_delay': datetime.timedelta(minutes=5), # If a task fails, retry it once after waiting at least 5 minutes # To email on failure or retry set 'email' arg to your email and enable # Setting start date as yesterday starts the DAG immediately when it is ![]() Insert_into='INSERT INTO study(study) VALUES (\'' my_name '\' ) 'įull example combined with Airflow dag and Python BranchOperator ( also committed to git)įrom import bigquery_to_gcsįrom import gcs_to_bqįrom _operator import DummyOperatorįrom airflow.operators import BashOperatorįrom import gcs_to_gcsįrom _operator import BigQueryOperatorįrom airflow.operators import PythonOperatorįrom _operator import BranchPythonOperatorĭ() - datetime.timedelta(1), ![]() Query = db.select().where(study_圜ol2 =my_name ) Study_table = db.Table('my_table', metadata, autoload=True, autoload_with=engine) To learn quickly SQLAlchemy: I used this blog for the select and this blog for the insert, 1 hour later the below sample code was born.Įngine = get_name_from_airflow_db(my_name): ImportError: this is MySQLdb version (1, 2, 4, 'beta', 4), but _mysql is version (1, 2, 5, 'final', 1) I tried using SQLAlchemy because I assumed since airflow is using it, the packages will be set. Push1 > pull1 > push2 > pull2 > push3 > pull3 > push4 > pull4Įventually, it was so frustrating using XCom, started checking how fast and simple would be to query the MySQL db directly from the dag (using a pythonOperator). Go over airflow DAG – “example_xcom” trigger the DAG For each PythonOperator – and view log –> watch the Xcom section
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |