![]() ![]() For instance, pgsql connection looks like below: You may use the traditional python code by importing the required libraries but Airflow also provides the options of hooks and connections. ![]() I am also using two external components here: FTP and Database. This and the other two tasks values are xcom_pulled in the transform_data task. I highlighted the returned value of the task. Wondering where those task_ids are defined? they are defined as PythonOperator.Ĭheck the log of the task fetch_binance_ohlcv: Here I passed the task_id of three tasks and assigned in three different variables. When you use xcom_pull, it means you are fetching data from the task whose Ids have been passed. In order to pass data from one task to another task, Airflow provides xcom_pull and xcom_push methods. ![]() Again these tasks were first added in a Python list and then branched out for the next stage. Notice I am using << here to define upstream for ftp_pdf_file and generate_pdf_reports tasks. The transformed data is being to create_text_file and load_data tasks who themselves are upstream for ftp_file and generate_pdf_reports tasks. The > sign is telling that it is an upstream. In first stage all 3 tasks run in parallel and output results to transform_data therefore I put all 3 tasks in a Python list. In Airflow terms, the DAG flow has been set like below: Dark green means these tasks are executed successfully while light green means they are still running. Notice that some borders are dark green and some are light green in color. The data from load_data section is then used to generate reports and then FTPing the file to the server. The 3 different files are then sent to the server via FTP. From here, I am branching out two tasks: one to create text files and storing data for later use and the other is storing into the DB. Once the data is available, I am sending all three sources into the next stage where I am only pulling close and timestamp data because these are fields that I need for the next stages. I am fetching BTC/USD data from three different exchanges: Binance, ByBit, and FTX. Before I continue, the demo of the work is given here: Therefore, it is advised you use the file I have provided in the Github repo. The docker-compose.yml file is available here but I have made a few changes to install custom libraries. It is easy to set up and using proper different images to run different components instead of a one-machine setup. I am using the dockerized version of Airflow. In this post, I am discussing how to use the CCXT library to grab BTC/USD data from exchanges and create an ETL for data analysis and visualization. In past, I have covered Apache Airflow posts here. I am taking a short break from Blockchain-related posts. This post is part of the Data Engineering and ETL Series. ![]()
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |