google cloud dataflow - Odd behaviour of SlidingWindows when used with TestPipeline -
i've got simple test demonstrate odd behaviour of sliding window when used testpipeline. bunch of strings fed input, accumulated in sliding window, sum aggregation applied count duplicates , output of aggregation function logged. sliding window of 10 minutes duration , 5 minutes period expected 1 window being used store elements (as new 1 started in 5 minutes after first one)...
public class slidingwindowtest { private static pipelineoptions options = pipelineoptionsfactory.create(); private static final logger log = loggerfactory.getlogger(slidingwindowtest.class); private static class identitydofn extends dofn<kv<string, integer>, kv<string, integer>> implements dofn.requireswindowaccess{ @override public void processelement(processcontext processcontext) throws exception { kv<string, integer> item = processcontext.element(); log.info("~~~~~~~~~~> {} => {}", item.getkey(), item.getvalue()); log.info("~~~~~~~~~~~ {}", processcontext.window()); processcontext.output(item); } } @test public void whatswrongwithslidingwindow() { pipeline p = testpipeline.create(options); p.apply(create.of("cab", "abc", "a1b2c3", "abc", "a1b2c3")) .apply(mapelements.via((string item) -> kv.of(item, 1)) .withoutputtype(new typedescriptor<kv<string, integer>>() {})) .apply(window.<kv<string, integer>>into(slidingwindows.of(duration.standardminutes(10)) .every(duration.standardminutes(5)))) .apply(sum.integersperkey()) .apply(pardo.of(new identitydofn())); p.run(); } }
but got 8 windows being fired instead. there wrong testpipeline or understanding of how sliding windows supposed work?
12:19:04.566 [main] debug c.g.c.d.sdk.coders.coderregistry - default coder com.google.cloud.dataflow.sdk.values.kv<java.lang.string, java.lang.integer>: kvcoder(stringutf8coder, varintcoder) 12:19:04.566 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~> abc => 2 12:19:04.567 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t19:50:00.000z..-290308-12-21t20:00:00.000z) 12:19:04.567 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~> abc => 2 12:19:04.567 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t19:55:00.000z..-290308-12-21t20:05:00.000z) 12:19:04.567 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~> a1b2c3 => 2 12:19:04.567 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t20:00:00.000z..-290308-12-21t20:10:00.000z) 12:19:04.567 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~> cab => 1 12:19:04.568 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t19:50:00.000z..-290308-12-21t20:00:00.000z) 12:19:04.568 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~> a1b2c3 => 2 12:19:04.568 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t19:50:00.000z..-290308-12-21t20:00:00.000z) 12:19:04.568 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~> cab => 1 12:19:04.568 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t19:55:00.000z..-290308-12-21t20:05:00.000z) 12:19:04.568 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~> abc => 2 12:19:04.568 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t20:00:00.000z..-290308-12-21t20:10:00.000z) 12:19:04.568 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~> cab => 1 12:19:04.568 [main] info c.q.m.core.slidingwindowtest - ~~~~~~~~~~~ [-290308-12-21t20:00:00.000z..-290308-12-21t20:10:00.000z)
p/s: dataflow sdk version: 1.8.0
the expected behavior different observe, different expect:
- first, have 3 different keys, if fell single window, expect 3 outputs.
- for sliding windows of 10 minutes 5 minute period, every element falls 2 windows. if element arrives @ minute
1
falls both window0
10
window-5
5
. should expect six output values, 2 per key. common pitfall think of windows updates pipeline runs, when in fact calculated properties of input data, not property of arrival time or pipeline's execution. - the
create
transform output values timestamp ofboundedwindow.timestamp_min_value
should fall same 2 windows.
your example seems indicate real bug. should not possible "a1b2c3"
in 2 disjoint windows falls in, nor "abc"
fall 3 windows, 2 of disjoint.
incidentally, though, benefit checking out dataflowassert
(called passert
in beam) testing contents of pcollection
in consistent , cross-runner way.
Comments
Post a Comment