1 package backtype.storm;
2
3 import backtype.storm.ConfigValidation;
4 import backtype.storm.serialization.IKryoDecorator;
5 import backtype.storm.serialization.IKryoFactory;
6 import com.esotericsoftware.kryo.Serializer;
7 import java.util.ArrayList;
8 import java.util.HashMap;
9 import java.util.List;
10 import java.util.Map;
11
12 /**
13 * Topology configs are specified as a plain old map. This class provides a
14 * convenient way to create a topology config map by providing setter methods for
15 * all the configs that can be set. It also makes it easier to do things like add
16 * serializations.
17 *
18 * <p>This class also provides constants for all the configurations possible on
19 * a Storm cluster and Storm topology. Each constant is paired with a schema
20 * that defines the validity criterion of the corresponding field. Default
21 * values for these configs can be found in defaults.yaml.</p>
22 *
23 * <p>Note that you may put other configurations in any of the configs. Storm
24 * will ignore anything it doesn't recognize, but your topologies are free to make
25 * use of them by reading them in the prepare method of Bolts or the open method of
26 * Spouts.</p>
27 */
28 public class Config extends HashMap<String, Object> {
29 /**
30 * The transporter for communication among Storm tasks
31 */
32 public static final String STORM_MESSAGING_TRANSPORT = "storm.messaging.transport";
33 public static final Object STORM_MESSAGING_TRANSPORT_SCHEMA = String.class;
34
35 /**
36 * Netty based messaging: The buffer size for send/recv buffer
37 */
38 public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
39 public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = Number.class;
40
41 /**
42 * Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
43 */
44 public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
45 public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = Number.class;
46
47 /**
48 * Netty based messaging: The min # of milliseconds that a peer will wait.
49 */
50 public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
51 public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = Number.class;
52
53 /**
54 * Netty based messaging: The max # of milliseconds that a peer will wait.
55 */
56 public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
57 public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = Number.class;
58
59 /**
60 * Netty based messaging: The # of worker threads for the server.
61 */
62 public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
63 public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = Number.class;
64
65 /**
66 * Netty based messaging: The # of worker threads for the client.
67 */
68 public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
69 public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
70
71 /**
72 * A list of hosts of ZooKeeper servers used to manage the cluster.
73 */
74 public static final String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";
75 public static final Object STORM_ZOOKEEPER_SERVERS_SCHEMA = ConfigValidation.StringsValidator;
76
77 /**
78 * The port Storm will use to connect to each of the ZooKeeper servers.
79 */
80 public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";
81 public static final Object STORM_ZOOKEEPER_PORT_SCHEMA = Number.class;
82
83 /**
84 * A directory on the local filesystem used by Storm for any local
85 * filesystem usage it needs. The directory must exist and the Storm daemons must
86 * have permission to read/write from this location.
87 */
88 public static final String STORM_LOCAL_DIR = "storm.local.dir";
89 public static final Object STORM_LOCAL_DIR_SCHEMA = String.class;
90
91 /**
92 * A global task scheduler used to assign topologies's tasks to supervisors' wokers.
93 *
94 * If this is not set, a default system scheduler will be used.
95 */
96 public static final String STORM_SCHEDULER = "storm.scheduler";
97 public static final Object STORM_SCHEDULER_SCHEMA = String.class;
98
99 /**
100 * The mode this Storm cluster is running in. Either "distributed" or "local".
101 */
102 public static final String STORM_CLUSTER_MODE = "storm.cluster.mode";
103 public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
104
105 /**
106 * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
107 * get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
108 *
109 * You should set this config when you dont have a DNS which supervisors/workers
110 * can utilize to find each other based on hostname got from calls to
111 * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
112 */
113 public static final String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
114 public static final Object STORM_LOCAL_HOSTNAME_SCHEMA = String.class;
115
116 /**
117 * The transport plug-in for Thrift client/server communication
118 */
119 public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
120 public static final Object STORM_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
121
122 /**
123 * The serializer class for ListDelegate (tuple payload).
124 * The default serializer will be ListDelegateSerializer
125 */
126 public static final String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";
127 public static final Object TOPOLOGY_TUPLE_SERIALIZER_SCHEMA = String.class;
128
129 /**
130 * Whether or not to use ZeroMQ for messaging in local mode. If this is set
131 * to false, then Storm will use a pure-Java messaging system. The purpose
132 * of this flag is to make it easy to run Storm in local mode by eliminating
133 * the need for native dependencies, which can be difficult to install.
134 *
135 * Defaults to false.
136 */
137 public static final String STORM_LOCAL_MODE_ZMQ = "storm.local.mode.zmq";
138 public static final Object STORM_LOCAL_MODE_ZMQ_SCHEMA = Boolean.class;
139
140 /**
141 * The root location at which Storm stores data in ZooKeeper.
142 */
143 public static final String STORM_ZOOKEEPER_ROOT = "storm.zookeeper.root";
144 public static final Object STORM_ZOOKEEPER_ROOT_SCHEMA = String.class;
145
146 /**
147 * The session timeout for clients to ZooKeeper.
148 */
149 public static final String STORM_ZOOKEEPER_SESSION_TIMEOUT = "storm.zookeeper.session.timeout";
150 public static final Object STORM_ZOOKEEPER_SESSION_TIMEOUT_SCHEMA = Number.class;
151
152 /**
153 * The connection timeout for clients to ZooKeeper.
154 */
155 public static final String STORM_ZOOKEEPER_CONNECTION_TIMEOUT = "storm.zookeeper.connection.timeout";
156 public static final Object STORM_ZOOKEEPER_CONNECTION_TIMEOUT_SCHEMA = Number.class;
157
158 /**
159 * The number of times to retry a Zookeeper operation.
160 */
161 public static final String STORM_ZOOKEEPER_RETRY_TIMES="storm.zookeeper.retry.times";
162 public static final Object STORM_ZOOKEEPER_RETRY_TIMES_SCHEMA = Number.class;
163
164 /**
165 * The interval between retries of a Zookeeper operation.
166 */
167 public static final String STORM_ZOOKEEPER_RETRY_INTERVAL="storm.zookeeper.retry.interval";
168 public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_SCHEMA = Number.class;
169
170 /**
171 * The ceiling of the interval between retries of a Zookeeper operation.
172 */
173 public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis";
174 public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = Number.class;
175
176 /**
177 * The Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
178 */
179 public static final String STORM_ZOOKEEPER_AUTH_SCHEME="storm.zookeeper.auth.scheme";
180 public static final Object STORM_ZOOKEEPER_AUTH_SCHEME_SCHEMA = String.class;
181
182 /**
183 * A string representing the payload for Zookeeper authentication. It gets serialized using UTF-8 encoding during authentication.
184 */
185 public static final String STORM_ZOOKEEPER_AUTH_PAYLOAD="storm.zookeeper.auth.payload";
186 public static final Object STORM_ZOOKEEPER_AUTH_PAYLOAD_SCHEMA = String.class;
187
188 /**
189 * The id assigned to a running topology. The id is the storm name with a unique nonce appended.
190 */
191 public static final String STORM_ID = "storm.id";
192 public static final Object STORM_ID_SCHEMA = String.class;
193
194 /**
195 * The host that the master server is running on.
196 */
197 public static final String NIMBUS_HOST = "nimbus.host";
198 public static final Object NIMBUS_HOST_SCHEMA = String.class;
199
200 /**
201 * Which port the Thrift interface of Nimbus should run on. Clients should
202 * connect to this port to upload jars and submit topologies.
203 */
204 public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
205 public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class;
206
207
208 /**
209 * This parameter is used by the storm-deploy project to configure the
210 * jvm options for the nimbus daemon.
211 */
212 public static final String NIMBUS_CHILDOPTS = "nimbus.childopts";
213 public static final Object NIMBUS_CHILDOPTS_SCHEMA = String.class;
214
215
216 /**
217 * How long without heartbeating a task can go before nimbus will consider the
218 * task dead and reassign it to another location.
219 */
220 public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
221 public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = Number.class;
222
223
224 /**
225 * How often nimbus should wake up to check heartbeats and do reassignments. Note
226 * that if a machine ever goes down Nimbus will immediately wake up and take action.
227 * This parameter is for checking for failures when there's no explicit event like that
228 * occuring.
229 */
230 public static final String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs";
231 public static final Object NIMBUS_MONITOR_FREQ_SECS_SCHEMA = Number.class;
232
233 /**
234 * How often nimbus should wake the cleanup thread to clean the inbox.
235 * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
236 */
237 public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs";
238 public static final Object NIMBUS_CLEANUP_INBOX_FREQ_SECS_SCHEMA = Number.class;
239
240 /**
241 * The length of time a jar file lives in the inbox before being deleted by the cleanup thread.
242 *
243 * Probably keep this value greater than or equal to NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS.
244 * Note that the time it takes to delete an inbox jar file is going to be somewhat more than
245 * NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS (depending on how often NIMBUS_CLEANUP_FREQ_SECS
246 * is set to).
247 * @see NIMBUS_CLEANUP_FREQ_SECS
248 */
249 public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
250 public static final Object NIMBUS_INBOX_JAR_EXPIRATION_SECS_SCHEMA = Number.class;
251
252 /**
253 * How long before a supervisor can go without heartbeating before nimbus considers it dead
254 * and stops assigning new work to it.
255 */
256 public static final String NIMBUS_SUPERVISOR_TIMEOUT_SECS = "nimbus.supervisor.timeout.secs";
257 public static final Object NIMBUS_SUPERVISOR_TIMEOUT_SECS_SCHEMA = Number.class;
258
259 /**
260 * A special timeout used when a task is initially launched. During launch, this is the timeout
261 * used until the first heartbeat, overriding nimbus.task.timeout.secs.
262 *
263 * <p>A separate timeout exists for launch because there can be quite a bit of overhead
264 * to launching new JVM's and configuring them.</p>
265 */
266 public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs";
267 public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = Number.class;
268
269 /**
270 * Whether or not nimbus should reassign tasks if it detects that a task goes down.
271 * Defaults to true, and it's not recommended to change this value.
272 */
273 public static final String NIMBUS_REASSIGN = "nimbus.reassign";
274 public static final Object NIMBUS_REASSIGN_SCHEMA = Boolean.class;
275
276 /**
277 * During upload/download with the master, how long an upload or download connection is idle
278 * before nimbus considers it dead and drops the connection.
279 */
280 public static final String NIMBUS_FILE_COPY_EXPIRATION_SECS = "nimbus.file.copy.expiration.secs";
281 public static final Object NIMBUS_FILE_COPY_EXPIRATION_SECS_SCHEMA = Number.class;
282
283 /**
284 * A custom class that implements ITopologyValidator that is run whenever a
285 * topology is submitted. Can be used to provide business-specific logic for
286 * whether topologies are allowed to run or not.
287 */
288 public static final String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";
289 public static final Object NIMBUS_TOPOLOGY_VALIDATOR_SCHEMA = String.class;
290
291 /**
292 * Class name for authorization plugin for Nimbus
293 */
294 public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer";
295 public static final Object NIMBUS_AUTHORIZER_SCHEMA = String.class;
296
297 /**
298 * Storm UI binds to this port.
299 */
300 public static final String UI_PORT = "ui.port";
301 public static final Object UI_PORT_SCHEMA = Number.class;
302
303 /**
304 * HTTP UI port for log viewer
305 */
306 public static final String LOGVIEWER_PORT = "logviewer.port";
307 public static final Object LOGVIEWER_PORT_SCHEMA = Number.class;
308
309 /**
310 * Childopts for log viewer java process.
311 */
312 public static final String LOGVIEWER_CHILDOPTS = "logviewer.childopts";
313 public static final Object LOGVIEWER_CHILDOPTS_SCHEMA = String.class;
314
315 /**
316 * Appender name used by log viewer to determine log directory.
317 */
318 public static final String LOGVIEWER_APPENDER_NAME = "logviewer.appender.name";
319 public static final Object LOGVIEWER_APPENDER_NAME_SCHEMA = String.class;
320
321 /**
322 * Childopts for Storm UI Java process.
323 */
324 public static final String UI_CHILDOPTS = "ui.childopts";
325 public static final Object UI_CHILDOPTS_SCHEMA = String.class;
326
327 /**
328 * List of DRPC servers so that the DRPCSpout knows who to talk to.
329 */
330 public static final String DRPC_SERVERS = "drpc.servers";
331 public static final Object DRPC_SERVERS_SCHEMA = ConfigValidation.StringsValidator;
332
333 /**
334 * This port is used by Storm DRPC for receiving DPRC requests from clients.
335 */
336 public static final String DRPC_PORT = "drpc.port";
337 public static final Object DRPC_PORT_SCHEMA = Number.class;
338
339 /**
340 * DRPC thrift server worker threads
341 */
342 public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
343 public static final Object DRPC_WORKER_THREADS_SCHEMA = Number.class;
344
345 /**
346 * DRPC thrift server queue size
347 */
348 public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
349 public static final Object DRPC_QUEUE_SIZE_SCHEMA = Number.class;
350
351 /**
352 * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
353 */
354 public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
355 public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = Number.class;
356
357 /**
358 * The timeout on DRPC requests within the DRPC server. Defaults to 10 minutes. Note that requests can also
359 * timeout based on the socket timeout on the DRPC client, and separately based on the topology message
360 * timeout for the topology implementing the DRPC function.
361 */
362 public static final String DRPC_REQUEST_TIMEOUT_SECS = "drpc.request.timeout.secs";
363 public static final Object DRPC_REQUEST_TIMEOUT_SECS_SCHEMA = Number.class;
364
365 /**
366 * Childopts for Storm DRPC Java process.
367 */
368 public static final String DRPC_CHILDOPTS = "drpc.childopts";
369 public static final Object DRPC_CHILDOPTS_SCHEMA = String.class;
370
371 /**
372 * the metadata configed on the supervisor
373 */
374 public static final String SUPERVISOR_SCHEDULER_META = "supervisor.scheduler.meta";
375 public static final Object SUPERVISOR_SCHEDULER_META_SCHEMA = Map.class;
376 /**
377 * A list of ports that can run workers on this supervisor. Each worker uses one port, and
378 * the supervisor will only run one worker per port. Use this configuration to tune
379 * how many workers run on each machine.
380 */
381 public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
382 public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NumbersValidator;
383
384
385 /**
386 * This parameter is used by the storm-deploy project to configure the
387 * jvm options for the supervisor daemon.
388 */
389 public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts";
390 public static final Object SUPERVISOR_CHILDOPTS_SCHEMA = String.class;
391
392
393 /**
394 * How long a worker can go without heartbeating before the supervisor tries to
395 * restart the worker process.
396 */
397 public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
398 public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = Number.class;
399
400
401 /**
402 * How long a worker can go without heartbeating during the initial launch before
403 * the supervisor tries to restart the worker process. This value override
404 * supervisor.worker.timeout.secs during launch because there is additional
405 * overhead to starting and configuring the JVM on launch.
406 */
407 public static final String SUPERVISOR_WORKER_START_TIMEOUT_SECS = "supervisor.worker.start.timeout.secs";
408 public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = Number.class;
409
410
411 /**
412 * Whether or not the supervisor should launch workers assigned to it. Defaults
413 * to true -- and you should probably never change this value. This configuration
414 * is used in the Storm unit tests.
415 */
416 public static final String SUPERVISOR_ENABLE = "supervisor.enable";
417 public static final Object SUPERVISOR_ENABLE_SCHEMA = Boolean.class;
418
419
420 /**
421 * how often the supervisor sends a heartbeat to the master.
422 */
423 public static final String SUPERVISOR_HEARTBEAT_FREQUENCY_SECS = "supervisor.heartbeat.frequency.secs";
424 public static final Object SUPERVISOR_HEARTBEAT_FREQUENCY_SECS_SCHEMA = Number.class;
425
426
427 /**
428 * How often the supervisor checks the worker heartbeats to see if any of them
429 * need to be restarted.
430 */
431 public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
432 public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;
433
434 /**
435 * The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced
436 * with an identifier for this worker.
437 */
438 public static final String WORKER_CHILDOPTS = "worker.childopts";
439 public static final Object WORKER_CHILDOPTS_SCHEMA = String.class;
440
441
442 /**
443 * How often this worker should heartbeat to the supervisor.
444 */
445 public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";
446 public static final Object WORKER_HEARTBEAT_FREQUENCY_SECS_SCHEMA = Number.class;
447
448 /**
449 * How often a task should heartbeat its status to the master.
450 */
451 public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
452 public static final Object TASK_HEARTBEAT_FREQUENCY_SECS_SCHEMA = Number.class;
453
454
455 /**
456 * How often a task should sync its connections with other tasks (if a task is
457 * reassigned, the other tasks sending messages to it need to refresh their connections).
458 * In general though, when a reassignment happens other tasks will be notified
459 * almost immediately. This configuration is here just in case that notification doesn't
460 * come through.
461 */
462 public static final String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs";
463 public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = Number.class;
464
465
466
467 /**
468 * True if Storm should timeout messages or not. Defaults to true. This is meant to be used
469 * in unit tests to prevent tuples from being accidentally timed out during the test.
470 */
471 public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = "topology.enable.message.timeouts";
472 public static final Object TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS_SCHEMA = Boolean.class;
473
474 /**
475 * When set to true, Storm will log every message that's emitted.
476 */
477 public static final String TOPOLOGY_DEBUG = "topology.debug";
478 public static final Object TOPOLOGY_DEBUG_SCHEMA = Boolean.class;
479
480
481 /**
482 * Whether or not the master should optimize topologies by running multiple
483 * tasks in a single thread where appropriate.
484 */
485 public static final String TOPOLOGY_OPTIMIZE = "topology.optimize";
486 public static final Object TOPOLOGY_OPTIMIZE_SCHEMA = Boolean.class;
487
488 /**
489 * How many processes should be spawned around the cluster to execute this
490 * topology. Each process will execute some number of tasks as threads within
491 * them. This parameter should be used in conjunction with the parallelism hints
492 * on each component in the topology to tune the performance of a topology.
493 */
494 public static final String TOPOLOGY_WORKERS = "topology.workers";
495 public static final Object TOPOLOGY_WORKERS_SCHEMA = Number.class;
496
497 /**
498 * How many instances to create for a spout/bolt. A task runs on a thread with zero or more
499 * other tasks for the same spout/bolt. The number of tasks for a spout/bolt is always
500 * the same throughout the lifetime of a topology, but the number of executors (threads) for
501 * a spout/bolt can change over time. This allows a topology to scale to more or less resources
502 * without redeploying the topology or violating the constraints of Storm (such as a fields grouping
503 * guaranteeing that the same value goes to the same task).
504 */
505 public static final String TOPOLOGY_TASKS = "topology.tasks";
506 public static final Object TOPOLOGY_TASKS_SCHEMA = Number.class;
507
508 /**
509 * How many executors to spawn for ackers.
510 *
511 * <p>If this is set to 0, then Storm will immediately ack tuples as soon
512 * as they come off the spout, effectively disabling reliability.</p>
513 */
514 public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";
515 public static final Object TOPOLOGY_ACKER_EXECUTORS_SCHEMA = Number.class;
516
517
518 /**
519 * The maximum amount of time given to the topology to fully process a message
520 * emitted by a spout. If the message is not acked within this time frame, Storm
521 * will fail the message on the spout. Some spouts implementations will then replay
522 * the message at a later time.
523 */
524 public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
525 public static final Object TOPOLOGY_MESSAGE_TIMEOUT_SECS_SCHEMA = Number.class;
526
527 /**
528 * A list of serialization registrations for Kryo ( http://code.google.com/p/kryo/ ),
529 * the underlying serialization framework for Storm. A serialization can either
530 * be the name of a class (in which case Kryo will automatically create a serializer for the class
531 * that saves all the object's fields), or an implementation of com.esotericsoftware.kryo.Serializer.
532 *
533 * See Kryo's documentation for more information about writing custom serializers.
534 */
535 public static final String TOPOLOGY_KRYO_REGISTER = "topology.kryo.register";
536 public static final Object TOPOLOGY_KRYO_REGISTER_SCHEMA = ConfigValidation.StringsValidator;
537
538 /**
539 * A list of classes that customize storm's kryo instance during start-up.
540 * Each listed class name must implement IKryoDecorator. During start-up the
541 * listed class is instantiated with 0 arguments, then its 'decorate' method
542 * is called with storm's kryo instance as the only argument.
543 */
544 public static final String TOPOLOGY_KRYO_DECORATORS = "topology.kryo.decorators";
545 public static final Object TOPOLOGY_KRYO_DECORATORS_SCHEMA = ConfigValidation.StringsValidator;
546
547 /**
548 * Class that specifies how to create a Kryo instance for serialization. Storm will then apply
549 * topology.kryo.register and topology.kryo.decorators on top of this. The default implementation
550 * implements topology.fall.back.on.java.serialization and turns references off.
551 */
552 public static final String TOPOLOGY_KRYO_FACTORY = "topology.kryo.factory";
553 public static final Object TOPOLOGY_KRYO_FACTORY_SCHEMA = String.class;
554
555
556 /**
557 * Whether or not Storm should skip the loading of kryo registrations for which it
558 * does not know the class or have the serializer implementation. Otherwise, the task will
559 * fail to load and will throw an error at runtime. The use case of this is if you want to
560 * declare your serializations on the storm.yaml files on the cluster rather than every single
561 * time you submit a topology. Different applications may use different serializations and so
562 * a single application may not have the code for the other serializers used by other apps.
563 * By setting this config to true, Storm will ignore that it doesn't have those other serializations
564 * rather than throw an error.
565 */
566 public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations";
567 public static final Object TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS_SCHEMA = Boolean.class;
568
569 /*
570 * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
571 * Each listed class will be routed all the metrics data generated by the storm metrics API.
572 * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
573 */
574 public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
575 public static final Object TOPOLOGY_METRICS_CONSUMER_REGISTER_SCHEMA = ConfigValidation.MapsValidator;
576
577
578 /**
579 * The maximum parallelism allowed for a component in this topology. This configuration is
580 * typically used in testing to limit the number of threads spawned in local mode.
581 */
582 public static final String TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism";
583 public static final Object TOPOLOGY_MAX_TASK_PARALLELISM_SCHEMA = Number.class;
584
585
586 /**
587 * The maximum number of tuples that can be pending on a spout task at any given time.
588 * This config applies to individual tasks, not to spouts or topologies as a whole.
589 *
590 * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
591 * Note that this config parameter has no effect for unreliable spouts that don't tag
592 * their tuples with a message id.
593 */
594 public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
595 public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = Number.class;
596
597 /**
598 * A class that implements a strategy for what to do when a spout needs to wait. Waiting is
599 * triggered in one of two conditions:
600 *
601 * 1. nextTuple emits no tuples
602 * 2. The spout has hit maxSpoutPending and can't emit any more tuples
603 */
604 public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
605 public static final Object TOPOLOGY_SPOUT_WAIT_STRATEGY_SCHEMA = String.class;
606
607 /**
608 * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
609 */
610 public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms";
611 public static final Object TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS_SCHEMA = Number.class;
612
613 /**
614 * The maximum amount of time a component gives a source of state to synchronize before it requests
615 * synchronization again.
616 */
617 public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs";
618 public static final Object TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS_SCHEMA = Number.class;
619
620 /**
621 * The percentage of tuples to sample to produce stats for a task.
622 */
623 public static final String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
624 public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = Number.class;
625
626 /**
627 * The time period that builtin metrics data in bucketed into.
628 */
629 public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
630 public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = Number.class;
631
632 /**
633 * Whether or not to use Java serialization in a topology.
634 */
635 public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization";
636 public static final Object TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION_SCHEMA = Boolean.class;
637
638 /**
639 * Topology-specific options for the worker child process. This is used in addition to WORKER_CHILDOPTS.
640 */
641 public static final String TOPOLOGY_WORKER_CHILDOPTS="topology.worker.childopts";
642 public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = String.class;
643
644 /**
645 * This config is available for TransactionalSpouts, and contains the id ( a String) for
646 * the transactional topology. This id is used to store the state of the transactional
647 * topology in Zookeeper.
648 */
649 public static final String TOPOLOGY_TRANSACTIONAL_ID="topology.transactional.id";
650 public static final Object TOPOLOGY_TRANSACTIONAL_ID_SCHEMA = String.class;
651
652 /**
653 * A list of task hooks that are automatically added to every spout and bolt in the topology. An example
654 * of when you'd do this is to add a hook that integrates with your internal
655 * monitoring system. These hooks are instantiated using the zero-arg constructor.
656 */
657 public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
658 public static final Object TOPOLOGY_AUTO_TASK_HOOKS_SCHEMA = ConfigValidation.StringsValidator;
659
660
661 /**
662 * The size of the Disruptor receive queue for each executor. Must be a power of 2.
663 */
664 public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size";
665 public static final Object TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
666
667 /**
668 * The maximum number of messages to batch from the thread receiving off the network to the
669 * executor queues. Must be a power of 2.
670 */
671 public static final String TOPOLOGY_RECEIVER_BUFFER_SIZE="topology.receiver.buffer.size";
672 public static final Object TOPOLOGY_RECEIVER_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
673
674 /**
675 * The size of the Disruptor send queue for each executor. Must be a power of 2.
676 */
677 public static final String TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE="topology.executor.send.buffer.size";
678 public static final Object TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
679
680 /**
681 * The size of the Disruptor transfer queue for each worker.
682 */
683 public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
684 public static final Object TOPOLOGY_TRANSFER_BUFFER_SIZE_SCHEMA = Number.class;
685
686 /**
687 * How often a tick tuple from the "__system" component and "__tick" stream should be sent
688 * to tasks. Meant to be used as a component-specific configuration.
689 */
690 public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs";
691 public static final Object TOPOLOGY_TICK_TUPLE_FREQ_SECS_SCHEMA = Number.class;
692
693
694 /**
695 * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency
696 * vs. throughput
697 */
698 public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY="topology.disruptor.wait.strategy";
699 public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = String.class;
700
701 /**
702 * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
703 * via the TopologyContext.
704 */
705 public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
706 public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = Number.class;
707
708 /**
709 * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
710 * an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
711 * reported to Zookeeper per task for every 10 second interval of time.
712 */
713 public static final String TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS="topology.error.throttle.interval.secs";
714 public static final Object TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS_SCHEMA = Number.class;
715
716 /**
717 * See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
718 */
719 public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL="topology.max.error.report.per.interval";
720 public static final Object TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL_SCHEMA = Number.class;
721
722
723 /**
724 * How often a batch can be emitted in a Trident topology.
725 */
726 public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
727 public static final Object TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS_SCHEMA = Number.class;
728
729 /**
730 * Name of the topology. This config is automatically set by Storm when the topology is submitted.
731 */
732 public static final String TOPOLOGY_NAME="topology.name";
733 public static final Object TOPOLOGY_NAME_SCHEMA = String.class;
734
735 /**
736 * Max pending tuples in one ShellBolt
737 */
738 public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
739 public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = Number.class;
740
741 /**
742 * The root directory in ZooKeeper for metadata about TransactionalSpouts.
743 */
744 public static final String TRANSACTIONAL_ZOOKEEPER_ROOT="transactional.zookeeper.root";
745 public static final Object TRANSACTIONAL_ZOOKEEPER_ROOT_SCHEMA = String.class;
746
747 /**
748 * The list of zookeeper servers in which to keep the transactional state. If null (which is default),
749 * will use storm.zookeeper.servers
750 */
751 public static final String TRANSACTIONAL_ZOOKEEPER_SERVERS="transactional.zookeeper.servers";
752 public static final Object TRANSACTIONAL_ZOOKEEPER_SERVERS_SCHEMA = ConfigValidation.StringsValidator;
753
754 /**
755 * The port to use to connect to the transactional zookeeper servers. If null (which is default),
756 * will use storm.zookeeper.port
757 */
758 public static final String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
759 public static final Object TRANSACTIONAL_ZOOKEEPER_PORT_SCHEMA = Number.class;
760
761 /**
762 * The number of threads that should be used by the zeromq context in each worker process.
763 */
764 public static final String ZMQ_THREADS = "zmq.threads";
765 public static final Object ZMQ_THREADS_SCHEMA = Number.class;
766
767 /**
768 * How long a connection should retry sending messages to a target host when
769 * the connection is closed. This is an advanced configuration and can almost
770 * certainly be ignored.
771 */
772 public static final String ZMQ_LINGER_MILLIS = "zmq.linger.millis";
773 public static final Object ZMQ_LINGER_MILLIS_SCHEMA = Number.class;
774
775 /**
776 * The high water for the ZeroMQ push sockets used for networking. Use this config to prevent buffer explosion
777 * on the networking layer.
778 */
779 public static final String ZMQ_HWM = "zmq.hwm";
780 public static final Object ZMQ_HWM_SCHEMA = Number.class;
781
782 /**
783 * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers)
784 * for the java.library.path value. java.library.path tells the JVM where
785 * to look for native libraries. It is necessary to set this config correctly since
786 * Storm uses the ZeroMQ and JZMQ native libs.
787 */
788 public static final String JAVA_LIBRARY_PATH = "java.library.path";
789 public static final Object JAVA_LIBRARY_PATH_SCHEMA = String.class;
790
791 /**
792 * The path to use as the zookeeper dir when running a zookeeper server via
793 * "storm dev-zookeeper". This zookeeper instance is only intended for development;
794 * it is not a production grade zookeeper setup.
795 */
796 public static final String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path";
797 public static final Object DEV_ZOOKEEPER_PATH_SCHEMA = String.class;
798
799 /**
800 * A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler
801 * to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
802 */
803 public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
804 public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = Map.class;
805
806 public static void setDebug(Map conf, boolean isOn) {
807 conf.put(Config.TOPOLOGY_DEBUG, isOn);
808 }
809
810 public void setDebug(boolean isOn) {
811 setDebug(this, isOn);
812 }
813
814 @Deprecated
815 public void setOptimize(boolean isOn) {
816 put(Config.TOPOLOGY_OPTIMIZE, isOn);
817 }
818
819 public static void setNumWorkers(Map conf, int workers) {
820 conf.put(Config.TOPOLOGY_WORKERS, workers);
821 }
822
823 public void setNumWorkers(int workers) {
824 setNumWorkers(this, workers);
825 }
826
827 public static void setNumAckers(Map conf, int numExecutors) {
828 conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, numExecutors);
829 }
830
831 public void setNumAckers(int numExecutors) {
832 setNumAckers(this, numExecutors);
833 }
834
835 public static void setMessageTimeoutSecs(Map conf, int secs) {
836 conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, secs);
837 }
838
839 public void setMessageTimeoutSecs(int secs) {
840 setMessageTimeoutSecs(this, secs);
841 }
842
843 public static void registerSerialization(Map conf, Class klass) {
844 getRegisteredSerializations(conf).add(klass.getName());
845 }
846
847 public void registerSerialization(Class klass) {
848 registerSerialization(this, klass);
849 }
850
851 public static void registerSerialization(Map conf, Class klass, Class<? extends Serializer> serializerClass) {
852 Map<String, String> register = new HashMap<String, String>();
853 register.put(klass.getName(), serializerClass.getName());
854 getRegisteredSerializations(conf).add(register);
855 }
856
857 public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass) {
858 registerSerialization(this, klass, serializerClass);
859 }
860
861 public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) {
862 HashMap m = new HashMap();
863 m.put("class", klass.getCanonicalName());
864 m.put("parallelism.hint", parallelismHint);
865 m.put("argument", argument);
866
867 List l = (List)this.get(TOPOLOGY_METRICS_CONSUMER_REGISTER);
868 if(l == null) { l = new ArrayList(); }
869 l.add(m);
870 this.put(TOPOLOGY_METRICS_CONSUMER_REGISTER, l);
871 }
872
873 public void registerMetricsConsumer(Class klass, long parallelismHint) {
874 registerMetricsConsumer(klass, null, parallelismHint);
875 }
876
877 public void registerMetricsConsumer(Class klass) {
878 registerMetricsConsumer(klass, null, 1L);
879 }
880
881 public static void registerDecorator(Map conf, Class<? extends IKryoDecorator> klass) {
882 getRegisteredDecorators(conf).add(klass.getName());
883 }
884
885 public void registerDecorator(Class<? extends IKryoDecorator> klass) {
886 registerDecorator(this, klass);
887 }
888
889 public static void setKryoFactory(Map conf, Class<? extends IKryoFactory> klass) {
890 conf.put(Config.TOPOLOGY_KRYO_FACTORY, klass.getName());
891 }
892
893 public void setKryoFactory(Class<? extends IKryoFactory> klass) {
894 setKryoFactory(this, klass);
895 }
896
897 public static void setSkipMissingKryoRegistrations(Map conf, boolean skip) {
898 conf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, skip);
899 }
900
901 public void setSkipMissingKryoRegistrations(boolean skip) {
902 setSkipMissingKryoRegistrations(this, skip);
903 }
904
905 public static void setMaxTaskParallelism(Map conf, int max) {
906 conf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, max);
907 }
908
909 public void setMaxTaskParallelism(int max) {
910 setMaxTaskParallelism(this, max);
911 }
912
913 public static void setMaxSpoutPending(Map conf, int max) {
914 conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, max);
915 }
916
917 public void setMaxSpoutPending(int max) {
918 setMaxSpoutPending(this, max);
919 }
920
921 public static void setStatsSampleRate(Map conf, double rate) {
922 conf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, rate);
923 }
924
925 public void setStatsSampleRate(double rate) {
926 setStatsSampleRate(this, rate);
927 }
928
929 public static void setFallBackOnJavaSerialization(Map conf, boolean fallback) {
930 conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, fallback);
931 }
932
933 public void setFallBackOnJavaSerialization(boolean fallback) {
934 setFallBackOnJavaSerialization(this, fallback);
935 }
936
937 private static List getRegisteredSerializations(Map conf) {
938 List ret;
939 if(!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
940 ret = new ArrayList();
941 } else {
942 ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_REGISTER));
943 }
944 conf.put(Config.TOPOLOGY_KRYO_REGISTER, ret);
945 return ret;
946 }
947
948 private static List getRegisteredDecorators(Map conf) {
949 List ret;
950 if(!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) {
951 ret = new ArrayList();
952 } else {
953 ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));
954 }
955 conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret);
956 return ret;
957 }
958 }