GCP Dataflow Api Example
Let’s create a streaming pipeline using Dataflow templates. We’ll use the Pub/Sub to BigQuery template, which reads messages written in JSON from a Pub/Sub topic and pushes them to a BigQuery table.
Create a dataset called taxirides
:
1bq mk taxirides
Instantiate a BQ table in that dataset:
bq mk \
--time_partitioning_field timestamp \
--schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
passenger_count:integer -t taxirides.realtime
Now we create a storage bucket to hold temporary data:
1export BUCKET_NAME="Bucket Name"
2gsutil mb gs://$BUCKET_NAME/
Finally, deploy the dataflow template:
1gcloud dataflow jobs run iotflow \
2 --gcs-location gs://dataflow-templates-"Region"/latest/PubSub_to_BigQuery \
3 --region "Region" \
4 --worker-machine-type e2-medium \
5 --staging-location gs://"Bucket Name"/temp \
6 --parameters inputTopic=projects/pubsub-public-data/topics/taxirides-realtime,outputTableSpec="Table Name":taxirides.realtime
Now we can submit queries using SQL:
1SELECT * FROM `"Bucket Name".taxirides.realtime` LIMIT 1000
References
#certification #engineer #machine #platform #cloud #path #learning #gcp #google #ai #model #development #lab #data preparation