To all Apache Beam and Dataflow users:If you’ve experimented with Beam, prototyped a pipeline, or verified assumptions about a dataset, you might have used Beam Notebooks or other interactive alternatives such as Google Colab or Jupyter Notebooks.You might also have noticed a gap between running a small prototype pipeline in a notebook and a production pipeline on Dataflow: What if you want to interactively process and inspect aggregations of bigger production datasets from within the notebook, but at scale? You cannot rely on the single machine that’s running your notebook to execute the pipeline because it simply lacks the capacity to do so.Allow me to introduce Interactive FlinkRunner on notebook-managed clusters. It lets you execute pipelines at scale and inspect results interactively with FlinkRunner on notebook-managed clusters. Under the hood, it uses Dataproc with its Flink and Docker components to provision long-lasting clusters.This post will introduce you to Interactive FlinkRunner using three examples:A starter word count example with a small notebook-managed cluster.An example using a much bigger cluster to process tens of millions of flight records to see how many flights are delayed for each airline.An example reusing the bigger cluster to run ML inference against 50,000 images with a pre-trained model – all from within a notebook.If you want to control the cost of these examples, you are free to use pipeline options to reduce the size of the data and the cluster. The starter example costs ~$1/hr and the other two cost ~$20/hr (estimated from Dataproc pricing and VM instance pricing). The actual cost may vary. Optionally, you can reduce the cost by configuring source data and pipeline options. PrerequisitesPrerequisitesOnce you have Beam Notebooks instantiated, create an empty notebook (ipynb) file and open it with a notebook kernel selected. Always use a notebook/IPython kernel with the newer Beam version to take advantage of bug fixes, optimizations and new features. For Dataflow-hosted Beam Notebooks, use notebook kernels with Beam versions >= 2.40.0.To get started, you have to check whether your project has the necessary services activated and permissions granted. You can find relevant information about the current user by executing the following in the notebook.code_block[StructValue([(u’code’, u’# Describe the user currently authenticated.rn!gcloud iam service-accounts describe $(gcloud config get-value account)rnrn# List the IAM roles granted to the user. If it’s already a Project Editor,rn# it should have all required IAM permissions. Otherwise, look for a projectrn# admin for missing grants if you encounter any permission issues in the examples.rn!gcloud projects get-iam-policy $(gcloud config get-value project) \rn –flatten=”bindings[].members” \rn –format=’table(bindings.role)’ \rn –filter=”bindings.members:$(gcloud config get-value account)”‘), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8d3e6dd0>)])]Interactive Flink on notebook-managed clusters uses Dataproc under the hood.code_block[StructValue([(u’code’, u’!gcloud services enable dataproc.googleapis.com’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8d3e6b10>)])]A starter example – Word CountYou’ve probably already seen the word count example multiple times. You know how to process and inspect the counted words with an InteractiveRunner or a DirectRunner on a single machine.And you are able to run the pipeline on Dataflow as a one-shot job from within the exact same notebook without copying/pasting, moving across workspaces, or setting up the Cloud SDK.To run it interactively with Flink on a notebook-managed cluster, you only need to change the runner and optionally modify some pipeline options.The notebook-managed Flink cluster is configurable through pipeline options. You need these imports for this and the other examples.code_block[StructValue([(u’code’, u’from apache_beam.options.pipeline_options import FlinkRunnerOptionsrnfrom apache_beam.options.pipeline_options import GoogleCloudOptionsrnfrom apache_beam.options.pipeline_options import PipelineOptionsrnfrom apache_beam.options.pipeline_options import PortableOptionsrnfrom apache_beam.options.pipeline_options import SetupOptionsrnfrom apache_beam.options.pipeline_options import WorkerOptionsrnfrom apache_beam.runners.interactive.interactive_runner import InteractiveRunnerrnfrom apache_beam.runners.portability.flink_runner import FlinkRunner’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8d3e6590>)])]You can then set up the configurations for development and execution.code_block[StructValue([(u’code’, u”import loggingrnlogging.getLogger().setLevel(logging.ERROR)rnrnimport google.authrnproject = google.auth.default()[1]rnrn# IMPORTANT! Adjust the following to choose a Cloud Storage location.rn# Used to cache source recordings and computed PCollections.rnib.options.cache_root = ‘gs://YOUR-BUCKET/’rnrn# Define an InteractiveRunner that uses the FlinkRunner under the hood.rninteractive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())rnrn# Set up the Apache Beam pipeline options.rnoptions = PipelineOptions()rnoptions.view_as(GoogleCloudOptions).project = project”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8d21a9d0>)])]Above are the minimum configurations needed; you’ll further customize them in later examples.You can find the source code of the word count example here. Modify it with the interactive_flink_runner to build the pipeline in the notebook. The example uses gs://apache-beam-samples/shakespeare/kinglear.txt as the input file.Inspecting the PCollection counts would implicitly start a Flink cluster, execute the pipeline, and render the result in the notebook.Example 2 – Find out how many flights are delayedThis example reads more than 17 million records from a public BigQuery dataset, bigquery-samples.airline_ontime_data.flights, and counts how many flights have been delayed since 2010 for all the airlines.On a normal InteractiveRunner running directly on a single notebook instance, it could take more than an hour for reading and processing due to the number of records (though the size of data is relatively small, ~ 1GB), and the pipeline can OOM or run out of disk space when the data is even bigger. With interactive Flink on notebook-managed clusters, you work with a higher capacity and performance (~ 4 mins for the example) while still being able to construct the pipeline step by step and inspect the results one by one within a notebook.You need to have BigQuery service activated.code_block[StructValue([(u’code’, u’!gcloud services enable bigquery.googleapis.com’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c7b9a44d0>)])]Configure a much bigger cluster with the options below. You may add a “LIMIT 1000” or similar constraints in the BigQuery read query to limit the records read. Based on the size of data read from BigQuery, you may reduce the values of the options.code_block[StructValue([(u’code’, u”# Use cloudpickle to alleviate the burden of staging things in the main module.rnoptions.view_as(SetupOptions).pickle_library = ‘cloudpickle’rn# As a rule of thumb, the Flink cluster has about vCPU * #TMs = 8 * 40 = 320 slots.rnoptions.view_as(WorkerOptions).machine_type = ‘n1-highmem-8’rnoptions.view_as(WorkerOptions).num_workers = 40″), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c7b9a4b10>)])]Whenever you inspect the result of a PCollection through ib.show() or ib.collect() in a notebook, Beam implicitly runs a fragment of the pipeline to compute the data. You can adjust the parallelism of the execution interactively.code_block[StructValue([(u’code’, u’# The parallelism is applied to each step, so if your pipeline has 10 steps, yourn# end up having 150 * 10 tasks scheduled that can theoretically be executed in parallel byrn# the 320 (upper bound) slots/workers/threads.rnoptions.view_as(FlinkRunnerOptions).parallelism = 150′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c7bf0fa50>)])]With the above configurations, when you inspect data in the notebook, you are instructing Beam to implicitly start or reuse a Flink cluster on Google Cloud (Dataproc under the hood) with 40 VMs and run pipelines with parallelism set to 150.code_block[StructValue([(u’code’, u’options.view_as(GoogleCloudOptions).temp_location = ib.options.cache_rootrnbq_p = beam.Pipeline(runner=interactive_flink_runner, options=options)rnrndelays_by_airline = (rn bq_prn | ‘Read Dataset from BQ’ >> beam.io.ReadFromBigQuery(rn project=project, use_standard_sql=True,rn # Read 17,692,149 records, ~1GB worth of datarn query=(‘SELECT airline, arrival_delay ‘rn ‘FROM `bigquery-samples.airline_ontime_data.flights` ‘rn ‘WHERE date >= “2010-01-01″‘))rn | ‘Rebalance Data to TM Slots’ >> beam.Reshuffle(num_buckets=1000)rn | ‘Extract Delay Info’ >> beam.Map(rn lambda e: (e[‘airline’], e[‘arrival_delay’] > 0))rn | ‘Filter Delayed’ >> beam.Filter(lambda e: e[1])rn | ‘Count Delayed Flights Per Airline’ >> beam.combiners.Count.PerKey())’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c7bf0fe10>)])]You can include visualize_data=True when inspecting data through ib.show(). Binning the visualized data by their count, you can see that WN airline has the most delayed flights recorded in the dataset.Example 3 – Run ML inference at scale interactivelyThe RunInference example classifies 50,000 image files (~280GB) from within the notebook.The workload normally takes half a day for a single notebook instance or worker. With interactive Flink on notebook-managed clusters, it shows the result in ~1 minute. Looking at the Flink job dashboard, the actual inference only took a dozen seconds. The rest of the running time is overhead from staging the job, scheduling the tasks, writing the aggregated result to ib.options.cache_root, transferring the result back to the notebook, and rendering it in the browser.SetupFor the RunInference example, you need to build a container image. You can find more information about building a container image from a notebook in this guide.The extra Python dependencies needed for this example are:code_block[StructValue([(u’code’, u’%pip install torchrn%pip install torchvisionrn%pip install pillowrn%pip install transformers’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8d3e9fd0>)])]The example uses the validation image set from ImageNet and the PyTorch pre-trained ImageNetV2 model. You can download similar dependencies or use your own image dataset and model. Make sure you copy the pre-trained model to the container and use its file path in the Beam pipeline. You can find many image datasets from places such as ImageNet or COCO (Common Objects in Context) and pre-trained models such as MobileNetV2 in the ImageNet Models package.Configure the pipeline options to use the custom container you build.code_block[StructValue([(u’code’, u”options.view_as(PortableOptions).environment_config = f’gcr.io/{project}/flink'”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8d3e9ad0>)])]Build the pipelineTo run inference with a Beam pipeline, you need the following imports:code_block[StructValue([(u’code’, u’import iornfrom typing import Iterablernfrom typing import Optionalrnfrom typing import Tuplernrnimport torchrnfrom PIL import Imagernfrom torchvision import modelsrnfrom torchvision import transformsrnfrom torchvision.models.mobilenetv2 import MobileNetV2rnrnimport apache_beam as beamrnfrom apache_beam.io.filesystems import FileSystemsrnfrom apache_beam.ml.inference.base import KeyedModelHandlerrnfrom apache_beam.ml.inference.base import PredictionResultrnfrom apache_beam.ml.inference.base import RunInferencernfrom apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8d3e9f50>)])]Then you can define processing logic for each step of the pipeline. You can use a mixture of DoFns and normal functions that yield or return and later incorporate them into the pipeline with different transforms.code_block[StructValue([(u’code’, u”def filter_empty_text(text: str) -> Iterable[str]:rn if len(text.strip()) > 0:rn yield textrnrndef preprocess_image(data: Image.Image) -> torch.Tensor:rn image_size = (224, 224)rn # Pre-trained PyTorch models expect input images normalized with thern # below values (see: https://pytorch.org/vision/stable/models.html)rn normalize = transforms.Normalize(rn mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])rn transform = transforms.Compose([rn transforms.Resize(image_size),rn transforms.ToTensor(),rn normalize,rn ])rn return transform(data)rnrndef read_image(image_file_name: str) -> Tuple[str, torch.Tensor]:rn with FileSystems().open(image_file_name, ‘r’) as file:rn data = Image.open(io.BytesIO(file.read())).convert(‘RGB’)rn return image_file_name, preprocess_image(data)rnrnclass PostProcessor(beam.DoFn):rn def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:rn filename, prediction_result = elementrn prediction = torch.argmax(prediction_result.inference, dim=0)rn yield str(prediction.item())”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8d3e9d10>)])]Now define a few variables.code_block[StructValue([(u’code’, u”# Replace this with a file containing paths to your image files.rnimage_file_names = ‘gs://runinference/it_mobilenetv2_imagenet_validation_inputs.txt’rnmodel_state_dict_path = ‘/tmp/mobilenet_v2.pt’rnmodel_class = MobileNetV2rnmodel_params = {‘num_classes': 1000}rnrn# In this example we pass keyed inputs to the RunInference transform.rn# Therefore, we use KeyedModelHandler wrapper over PytorchModelHandler.rnmodel_handler = KeyedModelHandler(rn PytorchModelHandlerTensor(rn state_dict_path=model_state_dict_path,rn model_class=model_class,rn model_params=model_params))”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8d6a26d0>)])]And build the pipeline with the above building blocks.code_block[StructValue([(u’code’, u”pipeline = beam.Pipeline(interactive_flink_runner, options=options)rnrncounts = (rn pipelinern | ‘Read Image File Names’ >> beam.io.ReadFromText(rn image_file_names)rn | ‘Filter Empty File Names’ >> beam.ParDo(filter_empty_text)rn | ‘Shuffle Files to Read’ >> beam.Reshuffle(num_buckets=900)rn | ‘Read Image Data’ >> beam.Map(read_image)rn | ‘PyTorch Run Inference’ >> RunInference(model_handler)rn | ‘Process Output’ >> beam.ParDo(PostProcessor())rn | ‘Count Per Classification’ >> beam.combiners.Count.PerElement())rnrn# Further increase the parallelism from the starter example.rnoptions.view_as(FlinkRunnerOptions).parallelism = 300″), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8c893610>)])]The pipeline reads a text file with 50,000 image file names in it. The Reshuffle is necessary to rebalance the image file names to all the workers before reading the image files. Without it, all 50,000 files will be read from a single task/thread/worker no matter how high the parallelism is.Once read, each image will be classified into 1 of 1000 classes (e.g., a cat, a dog, a flower). The final aggregation counts how many images there are for each class.In notebooks, Beam tries to cache the computed data of each PCollection that is assigned to a variable defined in the main module or watched by ib.watch({‘pcoll_name’: pcoll}). Here, to speed everything up, you only assign the final aggregation to a PCollection variable named counts as it’s the only data worth inspection.To inspect the data, you can use either ib.show or ib.collect. If it’s the first time you inspect the data, a Flink cluster is implicitly started. For later inspections, computed PCollections do not incur executions. For inspections of data by newly appended transforms, the same cluster will be reused (unless instructed otherwise).You can also inspect the cluster by running ib.clusters.describe(pipeline).And you can follow the link in the output to the Flink dashboard where you can review finished jobs or future running jobs.As you can see, the process took 1m45s to run inference for 50,000 images (~280GB).You can further enrich the data if you know the mappings between classifications and their human-readable labels.code_block[StructValue([(u’code’, u”idx_to_label = pipeline | ‘A sample class idx to label’ >> beam.Create(list({rn ‘242’: ‘boxer’,rn ‘243’: ‘bull mastiff’,rn ‘244’: ‘Tibetan mastiff’,rn ‘245’: ‘French bulldog’,rn ‘246’: ‘Great Dane’,rn ‘247’: ‘Saint Bernard, St Bernard’,rn ‘248’: ‘Eskimo dog, husky’,rn ‘249’: ‘malamute, malemute, Alaskan malamute’,rn ‘250’: ‘Siberian husky’,rn ‘251’: ‘dalmatian, coach dog, carriage dog’,rn ‘252’: ‘affenpinscher, monkey pinscher, monkey dog’,rn ‘253’: ‘basenji’,rn ‘254’: ‘pug, pug-dog’,rn}.items()))rnrndef cross_join(idx_count, idx_labels):rn idx, count = idx_countrn if idx in idx_labels:rn return {‘class': idx, ‘label': idx_labels[idx], ‘count': count}rnrnlabel_counts = (rn countsrn | ‘Enrich with human-readable labels’ >> beam.Map(rn cross_join, idx_labels=beam.pvalue.AsDict(idx_to_label))rn | ‘Keep only enriched data’ >> beam.Filter(lambda x: x is not None))”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c7b45d4d0>)])]When inspecting the label_counts, the computed counts will be reused for the newly added transforms. After an aggregation, the output data size can be tiny compared with the input data. High parallelism does not help with processing small data and could introduce unnecessary overhead. You can interactively tune down the parallelism to inspect the result of processing only a handful of elements with the newly added transform.Clean UpExecute the code below to clean up clusters created by the notebook and avoid unintended charges.code_block[StructValue([(u’code’, u’ib.clusters.cleanup(force=True)’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8c893110>)])]Optionally, you can go to the Dataproc UI to manually manage your clusters.Open Source SupportApache Beam is open source software. The interactive features work with all IPython kernel-backed notebook runtimes. This also means the interactive FlinkRunner feature can be adapted to your own notebook and cluster setups.For example, you can use Google Colab (a free alternative to Dataflow-hosted Beam Notebooks) connected with a local runtime (kernel) on your own workstation and then interactively submit jobs to a Flink cluster that you host and manage.Set up Google Colab with local runtimeSet up a Flink cluster locallyTo use your own Flink cluster, simply specify the necessary options:code_block[StructValue([(u’code’, u”flink_options = options.view_as(FlinkRunnerOptions)rnflink_options.flink_master = ‘localhost:8081′ # Or any resolvable URL of your clusterrnflink_options.flink_version = ‘1.12’ # Or the version of Flink you use”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8c893390>)])]If you use Beam built from source code (a dev version), you can configure a compatible container image.code_block[StructValue([(u’code’, u”# Or any custom container you build to run the Python code you define.rnoptions.view_as(PortableOptions).environment_config = ‘apache/beam_python3.8_sdk:2.41.0′”), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e7c8c893a10>)])]Now you can run Beam pipelines interactively at scale on your own setup.CompatibilitiesInteractive Flink features are not patched back to older versions of (Interactive) Beam. Here is a compatibility table.Beam VersionsDataflow-hosted Beam NotebooksOther notebook and cluster setups<2.40.0Not supportedNot supported>=2.40.0,<2.43.0SupportedParallelism fixed to 1>=2.43.0SupportedSupportedThere is also a cluster manager UI widget in the JupyterLab extension apache-beam-jupyterlab-sidepanel. Dataflow-hosted Beam Notebooks have it pre-installed. If you use your own JupyterLab setup, you can install it from either NPM or source code. It’s not supported in other notebook runtime environments such as Colab or classic Jupyter Notebooks.Next StepsGo to the Vertex AI workbench and get started using Dataflow-hosted Beam Notebooks! You can create, share, and collaborate on your notebooks with ease. And you have the flexibility to control who can access your notebook and what resources to use any time you want to make a change.For the interactive Flink feature, check the public documentation for tips, caveats and FAQs when you run into issues.Your feedback, suggestions, and open source contributions are welcomed.
Quelle: Google Cloud Platform
Published by