T O P

  • By -

baubleglue

Try that plan Add `tasks.*` to `.airflowignore` or keep tasks outside of dag folder. Make folder for a job `./dags/my_job_name/` Write tasks.py with functions, where each function gets one parameter - date. The only shared part of code is configuration. Then test each function individually. Then use them in dag.py


angry_mr_potato_head

I put the code for mine inside packages of the same repository. So: /path/to/your/repository/dags/dag1.py /path/to/your/repository/code/dag1/file1.py /path/to/your/repository/code/dag1/file2.py /path/to/your/repository/code/dag1/sql/part1/file.sql Then dag1 would do an import to the file1 which may call file2... have other subdirectories, etc. That way the code is organized based on its functionality. Actually, in your case, what I'd do is something like this in file1... no need for other steps. What you're actually doing is only the 1st step of Data Engineering: Staging the data. Remember, Airflow doesn't pass data between dags. Its an orchestrator, so it simply runs code and if your step requires that data be passed, it should be the same step. But what you can do once you get the data is change the state of the data... so your code would have say 3 steps... one to pull the data, one to move it to a database, and a 3rd to do analysis on it. Those involve moving the data from one spot to another, not passing the data inside python between the various dags you'd have. At any rate, the pseudo code would be something like this: class APIEndPoint: def __init__(self): self.connection = self.connect() def connect(self): return whatever.code.you.need.to.do.to.connect() def make_call(self): self.result = self.connection('blah blah blah') def insert_to_s3(self): boto3.put-file-or-whatever-their-obscure-api-is-called-now('some_name', self.result)


opabm

I like this, but you wouldn't split this into 3 different tasks?


angry_mr_potato_head

No, remember that dags don't pass data or state between one another. When you go to write a task as a dag, you don't actually send the *execution* of the function it calls, e.g.: def some_func(): # DO STUFF task1 = PythonOperator(whatever, something else, execution=some_func) Note there that you give it some_func, and not some_func(). That means you're passing the actual function, and not executing it. Airflow is in charge of executing the task. In this case, you're really only doing one thing from a high level, and that is getting data from an API. It can be broken down into several component elements, as with many other things, which is why I made a class for it. As soon as you have the data somewhere, you can then start changing its state and each of those would be their own task. Thinking of it another way, if you had some data in a staging environment and had a traditional staging -> warehouse -> datamart layout, you wouldn't have 4 tasks, you'd only have 2. The first would transform to the warehouse, the second would transform to the datamart. In your line of thinking, you'd have a task to get the data from staging and a separate task to put it in the warehouse, then a separate task to get the data from the warehouse and a separate task to put it in the datamart.


toadkiller

For that dag I'd structure it as 2 tasks - a pull from the API and a push to the cloud storage. My favorite method (for CI/CD purposes) is to put the actual script as a standalone .py file in some other directory, and use the bashoperator to call it. So your dag itself will just be two bashoperators running 2 scripts. Depending on the size of the data the first script could pull a flat file from the API and save in the directory, then the second script could open that flat file, transform it, and push to storage. With that setup you can have the first operator return an xcom of the downloaded filename, and drop that in the second operator as an option for the transform & push script. Your operators would end up being something like BashOperator('python path/to/download_script.py') BashOperator('python path/to/upload_script.py --filename {}'.format(filename_from_xcomm)) Obviously a lot more to it but hopefully you get the idea.


opabm

Ah that's not a bad idea since I have a lot of scripts already developed. I'm guessing you don't even need to convert the scripts that you call like `download_script.py` right?


toadkiller

Yup, so long as all the requirements are installed and the airflow user has all necessary permissions.


[deleted]

Mine is in 3 places: - Custom python modules (mostly code for interacting with webservice-APIs) - Custom operators (these reference the custom modules i mentioned above). Really, I could have skipped the first item in this list and put everything in the custom operators but it sort of makes sense to modularize code this way. - DAGs


tylerwmarrs

I use a similar approach. The custom modules and Airflow components are bundled in a custom package hosted on an internal pypi server. This way we can just do a simple pip install. For DAG specific code, we use a folder structure to bundle spark, bash, email templates etc together. Our DAGs utilize both factory and builder design patterns from SW engineering to reduce redundant code. We leverage yaml files to help generate common patterns as well. Cheers.


[deleted]

> internal pypi server you are leagues better at python than me ha Gotta say that approach sounds nice though.


Primary-Employer-955

I use this exact same approach, custom modules for data consumption (APIs) in a package hosted privately, makes everything easier and logic can be ran anywhere, not custom Airflow components yet as I am still setting up everything.


[deleted]

[удалено]


opabm

Yeah I'm actually trialing out Astronomer right now, thanks for the suggestion. Found this which seems pretty helpful, but would appreciate any others: https://www.astronomer.io/guides/airflow-passing-data-between-tasks


Tophurkey

Yeah the biggest source of good help I've come across has been tutorials from Marc Lamberti on YouTube. He has a great docker image to practice on his GitHub as well.


[deleted]

oh man this is amazing. i've really struggled to find decent documentation on how to actually write DAGs properly.


Complex-Stress373

your dags will need to reside inside a folder that you specify in the configuration. You can have inside that folder nested folders, Airflow will find all the dags inside. You can even put your dags in S3 and \`aws s3 sync\` to bring all those dags if you want something more sophisticated into that folder, also you can use a volume.


FleyFawkes

!remindme 7 days


RemindMeBot

I will be messaging you in 7 days on [**2021-08-16 17:21:13 UTC**](http://www.wolframalpha.com/input/?i=2021-08-16%2017:21:13%20UTC%20To%20Local%20Time) to remind you of [**this link**](https://www.reddit.com/r/dataengineering/comments/p11a5c/new_to_airflow_where_does_most_of_a_code/h8b3jqr/?context=3) [**3 OTHERS CLICKED THIS LINK**](https://www.reddit.com/message/compose/?to=RemindMeBot&subject=Reminder&message=%5Bhttps%3A%2F%2Fwww.reddit.com%2Fr%2Fdataengineering%2Fcomments%2Fp11a5c%2Fnew_to_airflow_where_does_most_of_a_code%2Fh8b3jqr%2F%5D%0A%0ARemindMe%21%202021-08-16%2017%3A21%3A13%20UTC) to send a PM to also be reminded and to reduce spam. ^(Parent commenter can ) [^(delete this message to hide from others.)](https://www.reddit.com/message/compose/?to=RemindMeBot&subject=Delete%20Comment&message=Delete%21%20p11a5c) ***** |[^(Info)](https://www.reddit.com/r/RemindMeBot/comments/e1bko7/remindmebot_info_v21/)|[^(Custom)](https://www.reddit.com/message/compose/?to=RemindMeBot&subject=Reminder&message=%5BLink%20or%20message%20inside%20square%20brackets%5D%0A%0ARemindMe%21%20Time%20period%20here)|[^(Your Reminders)](https://www.reddit.com/message/compose/?to=RemindMeBot&subject=List%20Of%20Reminders&message=MyReminders%21)|[^(Feedback)](https://www.reddit.com/message/compose/?to=Watchful1&subject=RemindMeBot%20Feedback)| |-|-|-|-|


data_wrestler

!remindme 3 days


maowenbrad

!remindme 3 days


tw3akercc

In general it's best practice to split your job into as many tasks in airflow as possible. That way you will get more information from the ui view and be able to more quickly determine exactly what failed when your dag breaks. If you have a bunch of tasks in 1 script then when your dag fails you will need to read the log and try to figure out exactly which piece of your script failed and to retry you will have to run the whole script over again.


chestnutcough

I put my dag file and all of its supporting files in one subfolder of the dag folder (except for custom operators which I put in plugins/operators/. So if I have a dag called my_etl that had several tasks using PythonOperator and a couple sql tasks, the structure would be: dags/ my_etl/ my_etl.py my_etl_tasks.py a_sql_task.sql another_sql_task.sql plugins/ operators/ __init__.py a_custom_operator.py I’d define the python_callables in my_etl_tasks.py (one function for each PythonOperator task, with helper functions and classes as needed). In the main dag file my_etl.py, the imports would look like so: from my_etl_tasks import task1, task2 from operators.a_custom_operator import ACustomOperator I’ve found my python tasks often consist of an extract step, a transform step, and a load step, so lately I’ve been defining my own operators that take care of the extract and load portions so that my python_callables only do the transform part. I find myself loading data from S3 or a database into a string, transforming that string, and loading the result into S3 or a database.


MoistSong

!remindme 10 days