0
data4
2y

I am trying to extract data from the PubSub subscription and finally, once the data is extracted I want to do some transformation. Currently, it's in bytes format. I have tried multiple ways to extract the data in JSON format using custom schema it fails with an error

TypeError: __main__.MySchema() argument after ** must be a mapping, not str [while running 'Map to MySchema']

**readPubSub.py**

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
import typing

class MySchema(typing.NamedTuple):
user_id:str
event_ts:str
create_ts:str
event_id:str
ifa:str
ifv:str
country:str
chip_balance:str
game:str
user_group:str
user_condition:str
device_type:str
device_model:str
user_name:str
fb_connect:bool
is_active_event:bool
event_payload:str

TOPIC_PATH = "projects/nectar-259905/topics/events"

def run(pubsub_topic):
options = PipelineOptions(
streaming=True
)
runner = 'DirectRunner'

print("I reached before pipeline")

with beam.Pipeline(runner, options=options) as pipeline:
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| 'Map to MySchema' >> beam.Map(lambda msg: MySchema(**msg)).with_output_types(MySchema)
| "Writing to console" >> beam.Map(print))

print("I reached after pipeline")
result = message.run()
result.wait_until_finish()

run(TOPIC_PATH)

If I use it directly below

message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| "Writing to console" >> beam.Map(print))

I get output as

{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:42:52.283 UTC',
'event_id': 'Game_Request_Declined',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9140',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}

{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:54:38.297 UTC',
'event_id': 'Decline_Game_Request',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9905',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}

Please let me know if I m doing something wrong while parsing the data to JSON. Also, I am looking for examples to do data masking and run some SQL within Apache Beam

Comments
  • 1
    This isn't stack overflow, my dear fellow developer.
  • 2
    Looks like a perfect use case for chatgpt.
    I don‘t mean the devrant bot.
  • 2
    @Lensflare very true

    @data4 here is a response with snippet for you from a gpt4 model, I hope it helps:
    https://rentry.co/czrt6
  • 0
    Well i think most probably the problem is caused by supplying a string (message) to the MySchema constructor, that requires a mapping (a dictionary-like object that maps keys to values). You must use the json.loads() method to convert the JSON string to a Python dictionary.
Add Comment