google cloud dataflow - Odd behaviour of SlidingWindows when used with TestPipeline -


i've got simple test demonstrate odd behaviour of sliding window when used testpipeline. bunch of strings fed input, accumulated in sliding window, sum aggregation applied count duplicates , output of aggregation function logged. sliding window of 10 minutes duration , 5 minutes period expected 1 window being used store elements (as new 1 started in 5 minutes after first one)...

public class slidingwindowtest {     private static pipelineoptions options = pipelineoptionsfactory.create();     private static final logger log = loggerfactory.getlogger(slidingwindowtest.class);      private static class identitydofn extends dofn<kv<string, integer>, kv<string, integer>>         implements dofn.requireswindowaccess{         @override         public void processelement(processcontext processcontext) throws exception {             kv<string, integer> item = processcontext.element();             log.info("~~~~~~~~~~> {} => {}", item.getkey(), item.getvalue());             log.info("~~~~~~~~~~~ {}", processcontext.window());             processcontext.output(item);         }     }      @test     public void whatswrongwithslidingwindow() {         pipeline p = testpipeline.create(options);          p.apply(create.of("cab", "abc", "a1b2c3", "abc", "a1b2c3"))             .apply(mapelements.via((string item) -> kv.of(item, 1))                        .withoutputtype(new typedescriptor<kv<string, integer>>() {}))             .apply(window.<kv<string, integer>>into(slidingwindows.of(duration.standardminutes(10))                                                         .every(duration.standardminutes(5))))             .apply(sum.integersperkey())             .apply(pardo.of(new identitydofn()));          p.run();     } } 

but got 8 windows being fired instead. there wrong testpipeline or understanding of how sliding windows supposed work?

12:19:04.566 [main] debug c.g.c.d.sdk.coders.coderregistry - default coder com.google.cloud.dataflow.sdk.values.kv<java.lang.string, java.lang.integer>: kvcoder(stringutf8coder, varintcoder) 12:19:04.566 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~> abc => 2 12:19:04.567 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t19:50:00.000z..-290308-12-21t20:00:00.000z) 12:19:04.567 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~> abc => 2 12:19:04.567 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t19:55:00.000z..-290308-12-21t20:05:00.000z) 12:19:04.567 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~> a1b2c3 => 2 12:19:04.567 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t20:00:00.000z..-290308-12-21t20:10:00.000z) 12:19:04.567 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~> cab => 1 12:19:04.568 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t19:50:00.000z..-290308-12-21t20:00:00.000z) 12:19:04.568 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~> a1b2c3 => 2 12:19:04.568 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t19:50:00.000z..-290308-12-21t20:00:00.000z) 12:19:04.568 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~> cab => 1 12:19:04.568 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t19:55:00.000z..-290308-12-21t20:05:00.000z) 12:19:04.568 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~> abc => 2 12:19:04.568 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t20:00:00.000z..-290308-12-21t20:10:00.000z) 12:19:04.568 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~> cab => 1 12:19:04.568 [main] info  c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t20:00:00.000z..-290308-12-21t20:10:00.000z) 

p/s: dataflow sdk version: 1.8.0

the expected behavior different observe, different expect:

  • first, have 3 different keys, if fell single window, expect 3 outputs.
  • for sliding windows of 10 minutes 5 minute period, every element falls 2 windows. if element arrives @ minute 1 falls both window 0 10 window -5 5. should expect six output values, 2 per key. common pitfall think of windows updates pipeline runs, when in fact calculated properties of input data, not property of arrival time or pipeline's execution.
  • the create transform output values timestamp of boundedwindow.timestamp_min_value should fall same 2 windows.

your example seems indicate real bug. should not possible "a1b2c3" in 2 disjoint windows falls in, nor "abc" fall 3 windows, 2 of disjoint.

incidentally, though, benefit checking out dataflowassert (called passert in beam) testing contents of pcollection in consistent , cross-runner way.


Comments

Popular posts from this blog

asynchronous - C# WinSCP .NET assembly: How to upload multiple files asynchronously -

aws api gateway - SerializationException in posting new Records via Dynamodb Proxy Service in API -

asp.net - Problems sending emails from forum -