Quidest?

GCP Dataflow Api Example

Let’s create a streaming pipeline using Dataflow templates. We’ll use the Pub/Sub to BigQuery template, which reads messages written in JSON from a Pub/Sub topic and pushes them to a BigQuery table.

Create a dataset called taxirides:

1bq mk taxirides

Instantiate a BQ table in that dataset:

bq mk \
--time_partitioning_field timestamp \
--schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
passenger_count:integer -t taxirides.realtime  

Now we create a storage bucket to hold temporary data:

1export BUCKET_NAME="Bucket Name"
2gsutil mb gs://$BUCKET_NAME/

Finally, deploy the dataflow template:

1gcloud dataflow jobs run iotflow \
2    --gcs-location gs://dataflow-templates-"Region"/latest/PubSub_to_BigQuery \
3    --region "Region" \
4    --worker-machine-type e2-medium \
5    --staging-location gs://"Bucket Name"/temp \
6    --parameters inputTopic=projects/pubsub-public-data/topics/taxirides-realtime,outputTableSpec="Table Name":taxirides.realtime

Now we can submit queries using SQL:

1SELECT * FROM `"Bucket Name".taxirides.realtime` LIMIT 1000

References

#certification #engineer #machine #platform #cloud #path #learning #gcp #google #ai #model #development #lab #data preparation