Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
October 24, 2022 01:58 am GMT

Playing PyFlink in a Nutshell

Last week, we introduced how to build a PyFlink experiment environment, and today we will use that experimental environment to explore the possibilities of PyFlink.

PyFlink is a general purpose streaming framework and abstracts streaming processing into four levels.

  1. SQL
  2. Table API
  3. DataStream
  4. Stateful Stream Processing

The closer to the bottom the more flexibility is available, but also requiring writing more code. I would like to be able to do almost everything with PyFlink, so let's get started with the basic concepts of PyFlink development from a DataStream perspective.

This article will introduce a few key points of PyFlink development with simple descriptions and examples, but will not mention the implementation details of Flink.

DataStream Concept

The development of DataStream will follow the following process.

Image description

Basically, we get streaming data from a source, process it, and output it to somewhere.

This is expressed in PyFlink as follows.

ds = env.add_source(kafka_consumer)ds = ds.map(transform, output_type=output_type_info)ds.add_sink(kafka_producer)

Source and sink are easy to understand, but the key is what processing can be used?

In the official document there is a list of all available operations.
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/overview/

For the above example, we used map.

There is also an example of the operation in the Flink artifact, the code is placed at ./examples/python/datastream/basic_operations.py

What is the difference between map and flat_map?

There are two similar operations in the operation list, map and flat_map, what is the difference between these two operations?

The difference is in the number of generated outputs.

In the case of map, an input event generates one and only one output event; on the other hand, flat_map can generate zero to many output events.

Let's use the actual code as an example.

def map_transform(i: int):  return i * idef flat_map_transform(i: int):  for idx in range(i):    yield idxds.map(map_transform, output_type=Types.INT())ds.flat_map(flat_map_transform, output_type=Types.INT())

In this example, map squares all the input integers and passes them out, one input corresponds to one output. However, flat_map outputs a series of events, and the number of output events is determined by the input events.

If the input is 0, then yield of flat_map will not be triggered, and nothing will be generated.

State

State is the best feature of Flink.

Although we have various operations available, many of them actually produce results based on previous events. How do we keep the previous events? This is where State comes in.

State can be considered as an internal storage in order to persist data, and the size of State is the summary of every nodes memory.

Nevertheless, State can be persisted in a durable storage like RocksDB to gain more scalability.

from pyflink.datastream import StreamExecutionEnvironment, EmbeddedRocksDBStateBackendenv = StreamExecutionEnvironment.get_execution_environment()env.set_state_backend(EmbeddedRocksDBStateBackend())

To use State in Flink framework, there are two key points worth noting.

  1. State can only be used in "Keyed Data Stream".
  2. State is based on operations and not able to share with others.

All available States and the reference are listing below.
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/state/

In fact, an example at ./examples/python/datastream/state_access.py also provides a good demonstration.

Connect (Shared State)

As mentioned in the previous section, states are operation-based and cannot be shared, but sometimes we indeed need to combine two different streaming states, so what should we do?

Fortunately, Flink provides connect to enable us to share the states of different streams within the same job.

By using connect, we can combine different streams and use the same operation, so that we can share the same operation state.

To be more concrete, let me provide a practical example. There are two streams.

  • Stream 1 provides a mapping between the item identifiers and the item names. When the item name changes, an event (item_id, item_name) is sent into the stream, so we just need to save the latest status.
  • Stream 2 is the transaction history, including which item was sold and the number of items ordered.

What we want to do is, when any purchase is entered, then we have to sum it up and append the latest item name to it.

This is the classic streaming enrichment pattern, and I explained the enrichment design pattern in detail in my previous article.

Here is the full program example.

import logging, sysfrom pyflink.common import WatermarkStrategy, Rowfrom pyflink.common.serialization import Encoderfrom pyflink.common.typeinfo import Typesfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSourcefrom pyflink.datastream.execution_mode import RuntimeExecutionModefrom pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext, MapFunction, CoMapFunction, CoFlatMapFunctionfrom pyflink.datastream.state import MapStateDescriptor, ValueStateDescriptorfrom pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchemafrom pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumerclass SellTotal(CoFlatMapFunction):  def open(self, runtime_context: RuntimeContext):      state_desc = MapStateDescriptor('map', Types.LONG(), Types.STRING())      self.state = runtime_context.get_map_state(state_desc)      cnt_desc = ValueStateDescriptor('cnt', Types.LONG())      self.cnt_state = runtime_context.get_state(cnt_desc)  # (id, name)  def flat_map1(self, value):      self.state.put(value[0], value[1])      #return Row(value[0], f"update {value[1]}", 0)  # (id, cnt)  def flat_map2(self, value):      cnt = self.cnt_state.value() or 0      total = cnt + value[1]      self.cnt_state.update(total)      if not self.state.contains(value[0]):          name = "NONAME"      else:          name = self.state.get(value[0])      #return Row(value[0], name, total)      yield Row(value[0], name, total)def sell_total_demo(env):  type_info1 = Types.ROW([Types.LONG(), Types.STRING()])  ds1 = env.from_collection(      [(1, 'apple'), (2, 'banana'), (3, 'cherry'), (4, 'durian'), (6, 'fig'), (7, 'grape')],      type_info=type_info1)  type_info2 = Types.ROW([Types.LONG(), Types.LONG()])  ds2 = env.from_collection(      [(1, 1), (2, 3), (3, 5), (1, 5), (5, 100), (6, 66), (1, 10)],      type_info=type_info2)  output_type_info = Types.ROW([Types.LONG(), Types.STRING(), Types.LONG()])  serialization_schema = JsonRowSerializationSchema.Builder() \      .with_type_info(output_type_info) \      .build()  kafka_producer = FlinkKafkaProducer(      topic='TempResults',      serialization_schema=serialization_schema,      producer_config={'bootstrap.servers': 'kafka:9092', 'group.id': 'test_group'}  )  connected_ds = ds1.connect(ds2)  connected_ds.key_by(lambda a: a[0], lambda a: a[0]).flat_map(SellTotal(), output_type_info).add_sink(kafka_producer)  env.execute()if __name__ == '__main__':    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")    env = StreamExecutionEnvironment.get_execution_environment()    env.add_jars("file:////home/ec2-user/flink-1.15.2/opt/flink-sql-connector-kafka-1.15.2.jar")    print("connected demo gogo")    sell_total_demo(env)

In flat_map1 the stream 1 is handled, that is to say, the mapping of item number and item name is maintained, so this stream does not need to generate output events.

The core of the whole application is in flat_map2. We take the accumulated quantity from self.cnt_state and not only add the new quantity but also update it back to the state. Then, in the output process, we take the corresponding name from self.state and finally we output the enriched events.

Conclusion

In the last example, we demonstrate the operation, state, and merging of streams.

From this example, we can easily understand that Flink can do anything we want as long as we write the program correctly.

We will continue to do some experiments on stream processing and will continue to publish any further updates if there are any.


Original Link: https://dev.to/lazypro/playing-pyflink-in-a-nutshell-30oa

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To