Join devRant
Do all the things like
++ or -- rants, post your own rants, comment on others' rants and build your customized dev avatar
Sign Up
Pipeless API
From the creators of devRant, Pipeless lets you power real-time personalized recommendations and activity feeds using a simple API
Learn More
Related Rants
I am trying to extract data from the PubSub subscription and finally, once the data is extracted I want to do some transformation. Currently, it's in bytes format. I have tried multiple ways to extract the data in JSON format using custom schema it fails with an error
TypeError: __main__.MySchema() argument after ** must be a mapping, not str [while running 'Map to MySchema']
**readPubSub.py**
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
import typing
class MySchema(typing.NamedTuple):
user_id:str
event_ts:str
create_ts:str
event_id:str
ifa:str
ifv:str
country:str
chip_balance:str
game:str
user_group:str
user_condition:str
device_type:str
device_model:str
user_name:str
fb_connect:bool
is_active_event:bool
event_payload:str
TOPIC_PATH = "projects/nectar-259905/topics/events"
def run(pubsub_topic):
options = PipelineOptions(
streaming=True
)
runner = 'DirectRunner'
print("I reached before pipeline")
with beam.Pipeline(runner, options=options) as pipeline:
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| 'Map to MySchema' >> beam.Map(lambda msg: MySchema(**msg)).with_output_types(MySchema)
| "Writing to console" >> beam.Map(print))
print("I reached after pipeline")
result = message.run()
result.wait_until_finish()
run(TOPIC_PATH)
If I use it directly below
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| "Writing to console" >> beam.Map(print))
I get output as
{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:42:52.283 UTC',
'event_id': 'Game_Request_Declined',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9140',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}
{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:54:38.297 UTC',
'event_id': 'Decline_Game_Request',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9905',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}
Please let me know if I m doing something wrong while parsing the data to JSON. Also, I am looking for examples to do data masking and run some SQL within Apache Beam
question
apache beam
google-cloud-pubsub
python