Building an automated serverless deployment pipeline with Cloud Build

You’ve got a shiny new application ready to deploy to the cloud. After researching your options, you land on using Cloud Run with Cloud Build to build and push your containerized application code to an Artifact Registry repository. In three steps, using a Dockerfile and a Cloud Build configuration you  build your container, push it to Artifact Registry, and deploy it to Cloud Run:code_block[StructValue([(u’code’, u”steps:rn# Build the container imagern- name: ‘gcr.io/cloud-builders/docker’rn args: [‘build’, ‘-t’, ‘us-central1-docker.pkg.dev/my-project/my-app-repo/shiny-new-app’, ‘.’]rnrn# Push the container image to Artifact Registryrn- name: ‘gcr.io/cloud-builders/docker’rn args: [‘push’, ‘us-central1-docker.pkg.dev/my-project/my-app-repo/shiny-new-app’]rnrn# Deploy container image to Cloud Runrn- name: ‘gcr.io/google.com/cloudsdktool/cloud-sdk’rn entrypoint: gcloudrn args: [‘run’, ‘deploy’, ‘my-serverless-app’, ‘–image’, ‘us-central1-docker.pkg.dev/my-project/my-app-repo/shiny-new-app’, ‘–region’, ‘us-central1′]rnrnimages:rn- us-central1-docker.pkg.dev/my-project/my-app-repo/shiny-new-app”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e9dc8528850>)])]The example above combines the build, push and deployment steps into one Cloud Build job.  This blog will show you what this could look like as a series of manual deployment steps, and how it can be developed into an automatic serverless deployment pipeline that can be used as a jumping off point for more complex solutions.  We’ll be using Cloud Build, Artifact Registry, Pub/Suband Cloud Run.  We’ll use the open source GitHub project, Emblem, to model a working reference of Google Cloud serverless architecture.  References to Emblem will be marked with the ???? emoji.A manual pipelineLet’s start by examining the manual steps to deploy a containerized application to Cloud Run.First, you make application code changes to your repository’s main branch. When an application change is merged, you use Cloud Build to build a new container.After a successful build, Cloud Build pushes the newly built container to Artifact Registry. You update Cloud Run with a new configuration pointing to the new build.Cloud Run deploys a new revision of your service. Your code changes are now deployed.  Of course, you would need to go through these steps each time there are changes to your application code. That’s not practical and can turn into a logistical nightmare for a team making continuous updates to the code.  Not to mention the added complexity of staging changes to multiple environments or incorporating systematic testing or incremental rollouts. Let’s see how you can automate your lovely little pipeline by looking at it as two parts: the build and the deployment.  Automate the buildTo automate the build step of your pipeline, Cloud Build should build and push when a change is committed to the application code in your repository.  Here’s what’s needed to make this happen:1. Connect your GitHub repository to your Cloud projectBy connecting your GitHub repository to your project, Cloud Build can use repository events to initiate a Cloud Build trigger. Common repository events are supported including pushing to a specific branch, pushing a new tag, and creating a pull request.  2. Include a Cloud Build yaml configuration in your repository You can configure a Cloud Build job with a build config file. This YAML file provides task-level instructions to Cloud Build.  This file can live alongside your application’s Dockerfile, or in a separate directory in your repository.  For an automatic build, your config file will tell Cloud Build to build the container image and push it to Artifact Registry.  ???? The Emblem project continuously builds multiple containers and keeps correspondingbuild config files in a centralized ops/ directory.  This allows for the separation of ownership of the Cloud Build configs and the application code they may build. 3. Create a Cloud Build triggerA Cloud Build trigger can be invoked each time a change is pushed to your main branch. Its configuration will require the GitHub repository to be connected to the Google Cloud project, the name of the branch you want to use, and the path to the Cloud Build configuration file in the repo. The invocation of the Cloud Build trigger can be narrowed down further by specifying files and directories to include or ignore, so that a new build can be created only when certain files have changed.???? The automatic build triggers featured in Emblem use aCloud Build config file that builds the container and pushes it to Artifact Registry:code_block[StructValue([(u’code’, u”steps:rn # Docker Build rn – name: ‘gcr.io/cloud-builders/docker’rn args: rn – ‘build’rn – ‘-t’rn – ‘${_REGION}-docker.pkg.dev/${PROJECT_ID}/content-api/content-api:${_IMAGE_TAG}’rnrn# Default to us-central1rnsubstitutions:rn _REGION: us-central1rn _IMAGE_TAG: $SHORT_SHArnrn# Store in Artifact Registryrnimages:rn – ‘${_REGION}-docker.pkg.dev/${PROJECT_ID}/content-api/content-api:${_IMAGE_TAG}'”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e9dc964ab90>)])]The variables prefixed by an underscore (_) allow for substitutions to be provided when configuring the Cloud Build trigger.  In the example above, _REGION can be overridden allowing this configuration to be used unchanged even if the container registry is moved to a new location. Substitutions without an underscore, such as $PROJECT_ID, are built-in and have values provided by Cloud Build. You can see the list of built-in substitutions in the documentation. This is helpful for using a single Cloud Build config for multiple triggers that have a similar function.Automate the deploymentWith the manual pipeline, you know when a new build is pushed so you can faithfully update the Cloud Run service yourself. For this to work automatically, there needs to be some way of signaling Cloud Run that a new build is available. You can do this with a little help from Pub/Sub and another Cloud Build trigger. Let’s look at this in more detail:1. The “gcr” Pub/Sub topicIf your Google Cloud project includes a Pub/Sub topic named “gcr”, Artifact Registry will publish messages about changes in its repositories. A message will be published every time an image build is pushed, tagged, or deleted. These messages are delivered by a corresponding Pub/Sub subscription to your application or in our case, to a Cloud Build trigger.2. Create another Cloud Build trigger A second Cloud Build trigger is configured to deploy a new revision of your Cloud Run service.  In addition to repository events, Cloud Build triggers support Pub/Sub events. You can select the gcr Pub/Sub topic as the trigger event to create a corresponding subscription.  With that, your Cloud Run service will be updated automatically when Artifact Registry publishes a message to Pub/Sub. While it is possible to have a single Cloud Build trigger build, push and deploy your application, separating the deployment from the build and the push allows each stage to run in a separate Cloud Build job and make it easier to develop each piece of the pipeline independently from the other.  ???? Emblem features two separate Cloud Build triggers that automatically deploy the website and content-api services to Cloud Run. They share a commonCloud Build config file:code_block[StructValue([(u’code’, u”steps:rn # Print the full Pub/Sub message for debugging.rn – name: gcr.io/cloud-builders/gcloudrn entrypoint: /bin/bashrn args:rn – ‘-c’rn – |rn echo ${_BODY}rn # Cloud Run Deployrn – name: gcr.io/cloud-builders/gcloudrn args:rn – runrn – deployrn – ${_SERVICE}rn – –image=${_IMAGE_NAME}rn – –region=${_REGION}rn – –revision-suffix=${_REVISION}rn – –project=${_TARGET_PROJECT}rn – –allow-unauthenticated”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e9dc964a390>)])]Once again, the config file uses variables for which values are provided via the trigger’s substitution variable settings.  Values for certain variables, such as _BODY, _IMAGE_NAME and _REVISION are evaluated using the message received from the gcr Pub/Sub topic, while others are hardcoded:Why stop there?The value of this pipeline lies not in its simplicity, but in its potential to be developed further to include more functionality, such as staging changes in a separate Google Cloud project, incorporating automatic testing for each change to your application, or doing incremental rollouts to your Cloud Run service.  These can all be achieved with a combination of additional Cloud Build triggers and Pub/Sub topics. Alternatively, with the recent addition of Cloud Run support, Cloud Deploy can be used as a delivery pipeline that will deploy to a Cloud Run target complete with rollbacks, approval, audit and delivery metrics.  ???? Emblem features a more advanced automatic deployment pipeline based on this model.  It includes multiple Google Cloud projects to support staging changes between multiple environments and incremental canary rollouts to production:To see this in action, visit the Emblem GitHub repository and deploy the Emblem sample application yourself.  For a step-by-step tutorial that will deploy this pipeline, The Squire’s guide to automated deployments with Cloud Build.Related ArticleThe Squire’s guide to automated deployments with Cloud BuildGetting started with your first automated deployment pipeline using open source project Emblem featuring Google Cloud Serverless products…Read Article
Quelle: Google Cloud Platform

Optimize Cloud Composer via Better Airflow DAGs

Hosting, orchestrating, and managing data pipelines is a complex process for any business.  Google Cloud offers Cloud Composer – a fully managed workflow orchestration service – enabling businesses to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers. Cloud Composer is built on the popular Apache Airflow open source project and operates using the Python programming language.  Apache Airflow allows users to create directed acyclic graphs (DAGs) of tasks, which can be scheduled to run at specific intervals or triggered by external events.This guide contains a generalized checklist of activities when authoring Apache Airflow DAGs.  These items follow best practices determined by Google Cloud and the open source community.  A collection of performant DAGs will enable Cloud Composer to work optimally and standardized authoring will help developers manage hundreds or even thousands of DAGs.  Each item will benefit your Cloud Composer environment and your development process.Get Started1. Standardize file names. Help other developers browse your collection of DAG files.a. ex) team_project_workflow_version.py2. DAGs should be deterministic.a. A given input will always produce the same output.3. DAGs should be idempotent. a. Triggering the DAG multiple times has the same effect/outcome.4. Tasks should be atomic and idempotent. a. Each task should be responsible for one operation that can be re-run independently of the others. In an atomized task, a success in part of the task means a success of the entire task.5. Simplify DAGs as much as possible.a. Simpler DAGs with fewer dependencies between tasks tend to have better scheduling performance because they have less overhead. A linear structure (e.g. A -> B -> C) is generally more efficient than a deeply nested tree structure with many dependencies. Standardize DAG Creation6. Add an owner to your default_args.a. Determine whether you’d prefer the email address / id of a developer, or a distribution list / team name.7. Use with DAG() as dag: instead of dag = DAG()a. Prevent the need to pass the dag object to every operator or task group.8. Set a version in the DAG ID. a. Update the version after any code change in the DAG.b. This prevents deleted Task logs from vanishing from the UI, no-status tasks generated for old dag runs, and general confusion of when DAGs have changed.c. Airflow open-source has plans to implement versioning in the future. 9. Add tags to your DAGs.a. Help developers navigate the Airflow UI via tag filtering.b. Group DAGs by organization, team, project, application, etc. 10. Add a DAG description. a. Help other developers understand your DAG.11. Pause your DAGs on creation. a. This will help avoid accidental DAG runs that add load to the Cloud Composer environment.12. Set catchup=False to avoid automatic catch ups overloading your Cloud Composer Environment.13. Set a dagrun_timeout to avoid dags not finishing, and holding Cloud Composer Environment resources or introducing collisions on retries.14. Set SLAs at the DAG level to receive alerts for long-running DAGs.a. Airflow SLAs are always defined relative to the start time of the DAG, not to individual tasks.b. Ensure that sla_miss_timeout is less than the dagrun_timeout.c. Example: If your DAG usually takes 5 minutes to successfully finish, set the sla_miss_timeout to 7 minutes and the dagrun_timeout to 10 minutes.  Determine these thresholds based on the priority of your DAGs.15. Ensure all tasks have the same start_date by default by passing arg to DAG during instantiation16. Use a static start_date with your DAGs. a. A dynamic start_date is misleading, and can cause failures when clearing out failed task instances and missing DAG runs.17. Set retries as a default_arg applied at the DAG level and get more granular for specific tasks only where necessary. a. A good range is 1–4 retries. Too many retries will add unnecessary load to the Cloud Composer environment.Example putting all the above together:code_block[StructValue([(u’code’, u’import airflowrnfrom airflow import DAGrnfrom airflow.operators.bash_operator import BashOperatorrnrn# Define default_args dictionary to specify default parameters of the DAG, such as the start date, frequency, and other settingsrndefault_args = {rn ‘owner': ‘me’,rn ‘retries': 2, # 2-4 retries maxrn ‘retry_delay': timedelta(minutes=5),rn ‘is_paused_upon_creation': True,rn ‘catchup': False,rn}rnrn# Use the `with` statement to define the DAG object and specify the unique DAG ID and default_args dictionaryrnwith DAG(rn ‘dag_id_v1_0_0′, #versioned IDrn default_args=default_args,rn description=’This is a detailed description of the DAG’, #detailed descriptionrn start_date=datetime(2022, 1, 1), # Static start datern dagrun_timeout=timedelta(minutes=10), #timeout specific to this dagrn sla_miss_timeout=timedelta(minutes=7), # sla miss less than timeoutrn tags=[‘example’, ‘versioned_dag_id’], # tags specific to this dagrn schedule_interval=None,rn) as dag:rn # Define a task using the BashOperatorrn task = BashOperator(rn task_id=’bash_task’,rn bash_command=’echo “Hello World”‘rn )’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee46b6fd110>)])]18. Define what should occur for each callback function. (send an email, log a context, message slack channel, etc.).  Depending on the DAG you may be comfortable doing nothing. a. successb. failurec. sla_missd. retryExample:code_block[StructValue([(u’code’, u’from airflow import DAGrnfrom airflow.operators.python_operator import PythonOperatorrnrndefault_args = {rn ‘owner': ‘me’,rn ‘retries': 2, # 2-4 retries maxrn ‘retry_delay': timedelta(minutes=5),rn ‘is_paused_upon_creation': True,rn ‘catchup': False,rn}rnrndef on_success_callback(context):rn # when a task in the DAG succeedsrn print(f”Task {context[‘task_instance_key_str’]} succeeded!”)rnrndef on_sla_miss_callback(context):rn # when a task in the DAG misses its SLArn print(f”Task {context[‘task_instance_key_str’]} missed its SLA!”)rnrndef on_retry_callback(context):rn # when a task in the DAG retriesrn print(f”Task {context[‘task_instance_key_str’]} retrying…”)rnrndef on_failure_callback(context):rn # when a task in the DAG failsrn print(f”Task {context[‘task_instance_key_str’]} failed!”)rnrn# Create a DAG and set the callbacksrnwith DAG(rn ‘dag_id_v1_0_0′,rn default_args=default_args,rn description=’This is a detailed description of the DAG’,rn start_date=datetime(2022, 1, 1), rn dagrun_timeout=timedelta(minutes=10),rn sla_miss_timeout=timedelta(minutes=7),rn tags=[‘example’, ‘versioned_dag_id’],rn schedule_interval=None,rn on_success_callback=on_success_callback, # what to do on successrn on_sla_miss_callback=on_sla_miss_callback, # what to do on sla missrn on_retry_callback=on_retry_callback, # what to do on retryrn on_failure_callback=on_failure_callback # what to do on failurern) as dag:rnrn def example_task(**kwargs):rn # This is an example task that will be part of the DAGrn print(f”Running example task with context: {kwargs}”)rnrn # Create a task and add it to the DAGrn task = PythonOperator(rn task_id=”example_task”,rn python_callable=example_task,rn provide_context=True,rn )’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee4786b43d0>)])]19. Use Task Groups to organize Tasks.Example:code_block[StructValue([(u’code’, u’# Use the `with` statement to define the DAG object and specify the unique DAG ID and default_args dictionaryrnwith DAG(rn ‘example_dag’,rn default_args=default_args,rn schedule_interval=timedelta(hours=1),rn) as dag:rn # Define the first task grouprn with TaskGroup(name=’task_group_1′) as tg1:rn # Define the first task in the first task grouprn task_1_1 = BashOperator(rn task_id=’task_1_1′,rn bash_command=’echo “Task 1.1″‘,rn dag=dag,rn )’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee4786b4210>)])]Reduce the Load on Your Composer Environment20. Use Jinja Templating / Macros instead of python functions.a. Airflow’s template fields allow you to incorporate values from environment variables and jinja templates into your DAGs. This helps make your DAGs idempotent (meaning multiple invocations do not change the result) and prevents unnecessary function execution during Scheduler heartbeats.b. The Airflow engine passes a few variables by default that are accessible in all templates.Contrary to best practices, the following example defines variables based on datetime Python functions:code_block[StructValue([(u’code’, u”# Variables used by tasksrn# Bad example – Define today’s and yesterday’s date using datetime modulerntoday = datetime.today()rnyesterday = datetime.today() – timedelta(1)”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee46945fd50>)])]If this code is in a DAG file, these functions execute on every Scheduler heartbeat, which may not be performant. Even more importantly, this doesn’t produce an idempotent DAG. You can’t rerun a previously failed DAG run for a past date because datetime.today() is relative to the current date, not the DAG execution date.A better way of implementing this is by using an Airflow Variable as such:code_block[StructValue([(u’code’, u”# Variables used by tasksrn# Good example – Define yesterday’s date with an Airflow variablernyesterday = {{ yesterday_ds_nodash }}”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee47a39a290>)])]21. Avoid creating your own additional Airflow Variables. a. The metadata database stores these variables and requires database connections to retrieve them. This can affect the performance of the Cloud Composer Environment. Use Environment Variables or Google Cloud Secrets instead.22. Avoid running all DAGs on the exact same schedules (disperse workload as much as possible). a. Prefer to use cron expressions for schedule intervals compared to airflow macros or time_deltas. This allows a more rigid schedule and it’s easier to spread out workloads throughout the day, making it easier on your Cloud Composer environment.b. Crontab.guru can help with generating specific cron expression schedules.  Check out the examples here.Examples:code_block[StructValue([(u’code’, u’schedule_interval=”*/5 * * * *”, # every 5 minutes.rnrn schedule_interval=”0 */6 * * *”, # at minute 0 of every 6th hour.’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee47a39a050>)])]23. Avoid XComs except for small amounts of data. a. These add storage and introduce more connections to the database. b. Use JSON dicts as values if absolutely necessary. (one connection for many values inside dict)24. Avoid adding unnecessary objects in the dags/ Google Cloud Storage path. a. If you must, add an .airflowignore file to GCS paths that the Airflow Scheduler does not need to parse. (sql, plug-ins, etc.)25. Set execution timeouts for tasks.Example:code_block[StructValue([(u’code’, u”# Use the `PythonOperator` to define the taskrntask = PythonOperator(rn task_id=’my_task’,rn python_callable=my_task_function,rn execution_timeout=timedelta(minutes=30), # Set the execution timeout to 30 minutesrn dag=dag,rn)”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee47a39ad90>)])]26. Use Deferrable Operators over Sensors when possible. a. A deferrable operator can suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to a Trigger. As a result, while it suspends (defers), it is not taking up a worker slot and your cluster will have fewer/lesser resources wasted on idle Operators or Sensors.Example:code_block[StructValue([(u’code’, u’PYSPARK_JOB = {rn “reference”: { “project_id”: “PROJECT_ID” },rn “placement”: { “cluster_name”: “PYSPARK_CLUSTER_NAME” },rn “pyspark_job”: {rn “main_python_file_uri”: “gs://dataproc-examples/pyspark/hello-world/hello-world.py”rn },rn}rnrnDataprocSubmitJobOperator(rn task_id=”dataproc-deferrable-example”,rn job=PYSPARK_JOB,rn deferrable=True,rn )’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee47a39ac10>)])]27. When using Sensors, always define mode, poke_interval, and timeout. a. Sensors require Airflow workers to run.b. Sensor checking every n seconds (i.e. poke_interval < 60)? Use mode=poke. A sensor in mode=poke will continuously poll every n seconds and hold Airflow worker resources. c. Sensor checking every n minutes (i.e. poke_interval >= 60)? Use mode=reschedule. A sensor in mode=reschedule will free up Airflow worker resources between poke intervals.Example:code_block[StructValue([(u’code’, u’table_partition_sensor = BigQueryTablePartitionExistenceSensor(rn project_id=”{{ project_id }}”,rn task_id=”bq_check_table_partition”,rn dataset_id=”{{ dataset }}”,rn table_id=”comments_partitioned”,rn partition_id=”{{ ds_nodash }}”,rn mode=”reschedule”rn poke_interval=60,rn timeout=60 * 5rn )’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee47a39ab50>)])]28. Offload processing to external services (BigQuery, Dataproc, Cloud Functions, etc.) to minimize load on the Cloud Composer environment.a. These services usually have their own Airflow Operators for you to utilize.29. Do not use sub-DAGs.a. Sub-DAGs were a feature in older versions of Airflow that allowed users to create reusable groups of tasks within DAGs. However, Airflow 2.0 deprecated sub-DAGs because they caused performance and functional issues.30. UsePub/Subfor DAG-to-DAG dependencies.a. Here is an example for multi-cluster / dag-to-dag dependencies. 31. Make DAGs load faster.a. Avoid unnecessary “Top-level” Python code. DAGs with many imports, variables, functions outside of the DAG will introduce greater parse times for the Airflow Scheduler and in turn reduce the performance and scalability of Cloud Composer / Airflow.b. Moving imports and functions within the DAG can reduce parse time (in the order of seconds).c. Ensure that developed DAGs do not increase DAG parse times too much.Example:code_block[StructValue([(u’code’, u”import airflowrnfrom airflow import DAGrnfrom airflow.operators.python_operator import PythonOperatorrnrn# Define default_args dictionaryrndefault_args = {rn ‘owner': ‘me’,rn ‘start_date': datetime(2022, 11, 17),rn}rnrn# Use with statement and DAG context manager to instantiate the DAGrnwith DAG(rn ‘my_dag_id’,rn default_args=default_args,rn schedule_interval=timedelta(days=1),rn) as dag:rn # Import module within DAG blockrn import my_module # DO THISrnrn # Define function within DAG blockrn def greet(): # DO THISrn greeting = my_module.generate_greeting()rn print(greeting)rnrn # Use the PythonOperator to execute the functionrn greet_task = PythonOperator(rn task_id=’greet_task’,rn python_callable=greetrn )”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee47a39ae10>)])]Improve Development and Testing32. Implement “self-checks” (via Sensors or Deferrable Operators).a. To ensure that tasks are functioning as expected, you can add checks to your DAG. For example, if a task pushes data to a BigQuery partition, you can add a check in the next task to verify that the partition generates and that the data is correct.Example:code_block[StructValue([(u’code’, u’# ————————————————————rn # Transform source data and transfer to partitioned tablern # ————————————————————rnrn create_or_replace_partitioned_table_job = BigQueryInsertJobOperator(rn task_id=”create_or_replace_comments_partitioned_query_job”,rn configuration={rn “query”: {rn “query”: ‘sql/create_or_replace_comments_partitioned.sql’,rn “useLegacySql”: False,rn }rn },rn location=”US”,rn )rnrn create_or_replace_partitioned_table_job_error = dummy_operator.DummyOperator(rn task_id=”create_or_replace_partitioned_table_job_error”,rn trigger_rule=”one_failed”,rn )rnrn create_or_replace_partitioned_table_job_ok = dummy_operator.DummyOperator(rn task_id=”create_or_replace_partitioned_table_job_ok”, trigger_rule=”one_success”rn )rnrn # ————————————————————rn # Determine if today’s partition exists in comments_partitionedrn # ————————————————————rnrn table_partition_sensor = BigQueryTablePartitionExistenceSensor(rn project_id=”{{ project_id }}”,rn task_id=”bq_check_table_partition”,rn dataset_id=”{{ dataset }}”,rn table_id=”comments_partitioned”,rn partition_id=”{{ ds_nodash }}”,rn mode=”reschedule”rn poke_interval=60,rn timeout=60 * 5rn )rnrn create_or_replace_partitioned_table_job >> [rn create_or_replace_partitioned_table_job_error,rn create_or_replace_partitioned_table_job_ok,rn ]rn create_or_replace_partitioned_table_job_ok >> table_partition_sensor’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee47a39a1d0>)])]33. Look for opportunities to dynamically generate similar tasks/task groups/DAGs via Python code.a. This can simplify and standardize the development process for DAGs. Example:code_block[StructValue([(u’code’, u’import airflowrnfrom airflow import DAGrnfrom airflow.operators.python_operator import PythonOperatorrnrndef create_dag(dag_id, default_args, task_1_func, task_2_func):rn with DAG(dag_id, default_args=default_args) as dag:rn task_1 = PythonOperator(rn task_id=’task_1′,rn python_callable=task_1_func,rn dag=dagrn )rn task_2 = PythonOperator(rn task_id=’task_2′,rn python_callable=task_2_func,rn dag=dagrn )rn task_1 >> task_2rn return dagrnrndef task_1_func():rn print(“Executing task 1″)rnrndef task_2_func():rn print(“Executing task 2″)rnrndefault_args = {rn ‘owner': ‘me’,rn ‘start_date': airflow.utils.dates.days_ago(2),rn}rnrnmy_dag_id = create_dag(rn dag_id=’my_dag_id’,rn default_args=default_args,rn task_1_func=task_1_func,rn task_2_func=task_2_funcrn)’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee47aa4cd10>)])]34. Implement unit-testing for your DAGsExample:code_block[StructValue([(u’code’, u’from airflow import modelsrnfrom airflow.utils.dag_cycle_tester import test_cyclernrnrndef assert_has_valid_dag(module):rn “””Assert that a module contains a valid DAG.”””rnrn no_dag_found = Truernrn for dag in vars(module).values():rn if isinstance(dag, models.DAG):rn no_dag_found = Falsern test_cycle(dag) # Throws if a task cycle is found.rnrn if no_dag_found:rn raise AssertionError(‘module does not contain a valid DAG’)’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3ee47aa4cb10>)])]35. Perform local development via the Composer Local Development CLI Tool.a. Composer Local Development CLI tool streamlines Apache Airflow DAG development for Cloud Composer 2 by running an Airflow environment locally. This local Airflow environment uses an image of a specific Cloud Composer version.36. If possible, keep a staging Cloud Composer Environment to fully test the complete DAG run before deploying in the production.a. Parameterize your DAG to change the variables, e.g., the output path of Google Cloud Storage operation or the database used to read the configuration. Do not hard code values inside the DAG and then change them manually according to the environment.37. Use a Python linting tool such as Pylint or Flake8 for standardized code.38. Use a Python formatting tool such as Black or YAPF for standardized code.Next StepsIn summary, this blog provides a comprehensive checklist of best practices for developing Airflow DAGs for use in Google Cloud Composer. By following these best practices, developers can help ensure that Cloud Composer is working optimally and that their DAGs are well-organized and easy to manage.For more information about Cloud Composer, check out the following related blog posts and documentation pages:What is Cloud Composer? Deutsche Bank uses Cloud Composer workload automationUsing Cloud Build to keep Airflow Operators up-to-date in your Composer environmentWriting DAGs (workflows) | Cloud Composer
Quelle: Google Cloud Platform

Web- und Datenzugriffsereignisse aus Amazon FinSpace jetzt in AWS CloudTrail verfügbar

Amazon FinSpace bietet Kunden jetzt zusätzliche Optionen zur Überwachung der Benutzeraktivitäten durch die Protokollierung von Webanwendungs- und Datenzugriffsereignissen in AWS CloudTrail. Amazon FinSpace ist ein verwalteter analytischer Daten-Hub für Kapitalmarktkunden, der es Analysten und Data Engineers ermöglicht, auf Daten aus verschiedenen Quellen zuzugreifen und sie mithilfe der von FinSpace verwalteten Apache Spark Engine mit der Bibliothek für Kapitalmarkt-Zeitreihenanalysen zu transformieren. Wenn Benutzer eine Aktion in der FinSpace-Webanwendung ausführen oder Daten verwenden, die in ihrer FinSpace-Umgebung gespeichert sind, wird im Audit-Repository der FinSpace-Umgebung ein Ereignis veröffentlicht. Es kann dann mit der Audit-Reports-Anzeige abgerufen werden, die in der FinSpace-Webanwendung gehostet wird. So können FinSpace-Administratoren auf bequeme Weise schnell Benutzeraktivitäten und Datenzugriffsereignisse anzeigen.
Quelle: aws.amazon.com

AWS DataSync bietet Unterstützung für die Verwendung von Tags bei der Aufgabenausführung

AWS DataSync unterstützt jetzt die Verwendung von Tags bei der Aufgabenausführung. Eine DataSync-Aufgabe definiert, wo und wie Daten mithilfe von AWS DataSync übertragen werden. Eine Aufgabenausführung ist eine einzelne Ausführung einer Aufgabe. Mit dieser neuen Funktion können Sie bei jeder Ausführung einer Aufgabe Tags anwenden, sodass Sie Ihre Aufgabenausführungen besser steuern und verwalten können.
Quelle: aws.amazon.com

Amazon Route 53 bietet jetzt Bedrohungsinformationen aus Recorded Future

Der Service Amazon Route 53 Resolver DNS Firewall hat sein Angebot um Bedrohungsinformationen von Recorded Future erweitert und damit die Bandbreite der DNS-Bedrohungen erweitert, die Sie mithilfe der Listen verwalteter AWS-Domänen der DNS-Firewall blockieren können. Die Domänen-Risikoliste von Recorded Future enthält über 100.000 Domänen mit dynamischen Risikobewertungen, die aktualisiert werden, sobald neue Bedrohungen identifiziert werden, und kontinuierlich zur DNS-Firewall hinzugefügt werden. Die Intelligence Cloud von Recorded Future verwendet Sandbox-Analysen, Netzwerkverkehrsanalysen und Command-and-Control-Erkennung, um potenziell bösartige Domänen von Nachrichtenseiten, Blogs, dem Dark Web, TOR-Websites, Untergrundforen und anderen externen Quellen zu identifizieren.
Quelle: aws.amazon.com

AWS Marketplace führt kostenlose Testversionen für SaaS-Produkte mit nutzungsbasierter Preisberechnung ein

AWS Marketplace kündigt kostenlose Testversionen für SaaS-Produkte mit nutzungsbasierter Preisberechnung an, eine Erweiterung der kostenlosen Testversionen für SaaS-Verträge, die am 31. Mai 2022 eingeführt wurden. Mit kostenlosen Testversionen in AWS Marketplace können unabhängige Softwareanbieter (ISVs) ihr Geschäft ausbauen, indem sie den Anmeldeaufwand reduzieren und Interessenten das Produkt zum Testen anbieten. 
Quelle: aws.amazon.com

AWS Glue Crawler verbessern die Unterstützung für Delta-Lake-Tabellen

AWS-Glue-Crawler bieten jetzt eine erweiterte Unterstützung für Delta-Lake-Tabellen der Linux Foundation, wodurch die betriebliche Effizienz erhöht wird, um aussagekräftige Erkenntnisse aus Analysediensten wie Amazon Athena, Amazon EMR und AWS Glue zu gewinnen. Diese Funktion ermöglicht es Analysediensten, Delta-Lake-Tabellen zu scannen, ohne dass Manifestdateien durch Glue-Crawler erstellt werden müssen. Neu katalogisierte Daten stehen jetzt schnell zur Analyse mit Ihren bevorzugten Analyse- und Machine Learning (ML)-Tools bereit. 
Quelle: aws.amazon.com