Monday, July 4, 2016

Flume Client interaction with Source , Load balancing and failover mechanisms with flume



Flume Client interaction with Source , Load balancing and failover mechanisms with flume

Flume has a fully plugin-based architecture.

--The client(NettyAvroRpcClient and ThriftRpcClient) operates at the point of origin of events and delivers them to a Flume agent. Clients typically operate in the process space of the application they are consuming data from.
--The client needs to create this object with the host and port of origin, and can then use the RpcClient to send to the flume agent
--The Flume Client SDK is a library that enables applications to connect to Flume and send data into Flume’s data flow over RPC.
--Flume currently supports Avro, log4j, syslog, and Http POST (with a JSON body) as ways to transfer data from a external source. Additionally, there’s an ExecSource that can consume the output of a local process as input to Flume.
--As of Flume 1.4.0, Avro is the default RPC protocol.
Steps from client SDK API to flume agent to set source, sink and channel.
1)In the user data-generating application using Client SDK api generate a class where it starts executing from driver class public static void main(String[] args). Write the logic where you are initializing the client with current hostname and port no to operate/listen and from where we transfer  from local process and collect at flume source agent in providing  some properties in flume.conf file.
some important classes to rememeber are
RpcClient and RpcClientFactory ,Event,EventBuilder to set client properties like client.type,hosts and portno
2)set the flume agent source type to avro(and thrift if using thrift client)
 and bind the port to exactly to the port you initialized with client and hostname in step 1.
agentname1.sources.processA_avrosource.type = avro/or thrift
agentname1.sources.processA_avrosource.port = 42424
agentname1.sources.processA_avrosource.bind = localhost

3)In a host Within a user’s data-generating application when using FLUME CLIENT SDK API
to tranfer data into flume THRIFT source you have to use few classes like Event,RpcClient,SecureRpcClientFactory,RpcClientConfigurationConstants,EventBuilder

4)Only the thrift source and sink supports Kerberos based authentication. In the client class you need to specify kerberos to true,client-principal,client-keytab,server-principal(to which flume source ),FailoverRpcClient class to be used.  client-principal and client-keytab are the properties used by the Thrift sink to authenticate to the kerberos KDC. The server-principal represents the principal of the Thrift source this sink is configured to connect to in secure mode.

5)For the failover mechanism to work with avro the client_type must be set to "default_failover" with no of hosts set with hostnames and portno's(hosts.nm1= host1.infovision.com:41414,hosts.nm2= host2.infovision.com:41414,hosts.nm3= host3.infovision.com:41414 and hosts as nm1,nm2,nm3)
At present FailoverRpc Client does not support thrift.
6)The LoadBalancing RPC Client currently does not support thrift. Whenever there is more load to distribute among hosts for better performance  use LoadBalancingRpcClient class with client_type must be set to "default_loadbalance" with no of hosts set with hostnames and portno's(hosts.nm1= host1.infovision.com:41414,hosts.nm2= host2.infovision.com:41414,hosts.nm3= host3.infovision.com:41414 and hosts as nm1,nm2,nm3) and host-selector=round_robin/random

7)Failover Sink processor: maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered).
 when using failover mechanism with :sinks some properties to set in configuration file
Agentname1.sinkgroups = sgroups
Agentname1.sinkgroups.sgroups.sinks = k1 k2
Agentname1.sinkgroups.sgroups.processor.type = failover
Agentname1.sinkgroups.sgroups.processor.priority.k1 = 5
Agentname1.sinkgroups.sgroups.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

8) Load balancing Sink Processor:Load balancing sink processor provides the ability to load-balance flow over multiple sinks.
When using load_balancer mechanism with sinks some properties to set in configuration file
Agentname1.sinkgroups = sgroups
Agentname1.sinkgroups.sgroups.sinks = k1 k2
Agentname1.sinkgroups.sgroups.processor.type = load_balance
Agentname1.sinkgroups.sgroups.processor.backoff = true(f backoff is enabled, the sink processor will blacklist sinks that fail, removing them for selection for a given timeout.)
Agentname1.sinkgroups.sgroups.processor.selector = random/round_robin.
Agentname1.sinkgroups.sgroups.processor.selector.maxTimeOut = 30000(Used by backoff selectors to limit exponential backoff (in milliseconds)

Custom selection mechanisms are supported via custom classes that inherits from AbstractSinkSelector.

9) To run a flume agent
Bashshell pointing to bin directory:> flume-ng agent  --name agentname1  -- conf conf  --conf-file /home/kumar/flume/flume_agent1.properties –Dflume.root.logger=INFO,console

hint: flume-ng is a start up script.
 -------------------             -----------------------------------

10)Flume has a fully plugin-based architecture.

While it has always been possible to include custom Flume components by adding their jars to the FLUME_CLASSPATH variable in the flume-env.sh file, 

Flume now supports a special directory called plugins.d which automatically picks up plugins that are packaged in a specific format. This allows for easier management of plugin packaging issues as well as simpler debugging and troubleshooting of several classes of issues, especially library dependency conflicts.

The plugins.d directory is located at $FLUME_HOME/plugins.d.

 At startup time, the flume-ng start script looks in the plugins.d directory for plugins that conform to the below format and includes them in proper paths when starting up java.
Each plugin (subdirectory) within plugins.d can have up to three sub-directories:

  1. lib - the plugin’s jar(s)
  2. libext - the plugin’s dependency jar(s)
  3. native - any required native libraries, such as .so files
Example of two plugins within the plugins.d directory:

$FLUME_HOME/plugins.d/
$FLUME_HOME/plugins.d/custom-source1/
$FLUME_HOME/plugins.d/customdb-source1/lib/my-dbsource.jar
$FLUME_HOME/plugins.d/customdb-source1/libext/spring-core-2.5.6.jar
$FLUME_HOME/plugins.d/customdb-source2/
$FLUME_HOME/plugins.d/customdb-source2/lib/custom_db.jar
$FLUME_HOME/plugins.d/customdb-source2/native/gettext.so
 
11)Using Zookeeper Based configurations: 
$ bin/flume-ng agent –conf conf -z zkserver1:2181,zkserver2:2181 
  -p /flume –name a1 -Dflume.root.logger=INFO,console 
   The configuration file needs to be uploaded in the Zookeeper,
  under a configurable prefix. The configuration file is stored
  in Zookeeper Node data.
  Following is how the Zookeeper Node tree would look like for
    agents a1 and a2
    -/flume (flume node tree base path)
          -/a1 (a1 agent configuration file)
          -/a2 (a2 agent configuration file)
 
12) Data Ingestion can be done in no of ways :Flume supports a
   number of mechanisms to ingest data from external sources. 
   -RPC, Executing commands through a "Exec" source, and
   Network Streams(which are Avro,Thrift,Syslog and Netcat)
 
     An Avro client included in the Flume distribution can send a 
    given file to
    Flume Avro source using avro RPC mechanism:
     $ bin/flume-ng avro-client -H localhost -p 41414 
     -F /usr/logs/log.10 
 
   (The above command will send the contents of /usr/logs/log.10 
     to the Flume source listening on that ports.) 
 
13)Flume Sources: Avro source, Thrift Source, Exec source,
    JMS source, Convertor,Spooling Directory Source,
   Twitter 1% Firehose Source,Kafka Source,
   Netcat source,Sequence Generator Source, Syslog Sources
   (syslogtcp,Syslogudp,multiport_syslogtcp,http,Stress source,
    Legacy Sources(Avro Legacy source, Thrift Legacy Source, 
   Custom source, Scribe source) 
 
14)Flume sinks:Hive sink , HDfs Sink, Logger sink, Avro sink,
   Thirft Sink,IRC sink,Null sink,File Roll sink,HBase sinks
  (HBaseSink, AsyncHBaseSink,MorphlineSolrSink,ElasticSearchSink
  (This sink writes data to an elasticsearch cluster. By default,
   events will be written so that the Kibana 
   graphical interface can display them - just as if logstash 
    wrote them),Kite Dataset Sink, Kafka sink,custom sink
 
15)Channels:Memory channel, Jdbc channel, file channel, 
    Kafka channel,Spillable Memory channel,Psuedo transaction 
    channel(The Pseudo Transaction Channel is only for unit 
    testing purposes and is NOT meant for production use),
    Custom channel
 
16)Flume channel selector:Replicating channel selector (default),
    Multiplexing Channel selector)
 
17)Flume Sink Processors:Failover Sink Processor,Load balancing
    Sink Processor,Custom Sink Processor
 

Sunday, July 3, 2016

For oozie ha prerequiste of installing MySql server on oozie-server node .

On this cluster hive database is Mysql-server on node name hiverserver and node2  has oozie-server installed. To make oozie-server as high available the backend must be a database with supports concurrent connections (which mysql supports)

Here hiveserver is  FQDN of a node in this cluster setup just as an example for demonstration.

Ssh into hiveserver as shown below.

 

A sample truncated  output for for Sudo service --status-all







Finding  Mysqld as shown below.
 





Mysql server my.cnf configuration file 




1)A another installation of Mysql-server on FQDN named node2 in this cluster just for demonstration.

bashshell> SSH root@node2 (provide password when asked)

2)once entered in node2 the bashshell looks like as  below
[root@node2]

3) using yum package installer install MYSQL-SERVER

[root@node2] yum install mysql-server

4)After installation is successful start mysqld in sbin folder as shown below.

 


5)Run the mysql_secure_installation script in /usr/bin/ folder to secure your mysql-server installation to remove any default databases, to change root password and remove any  anonymous users. 
This is for demo purposes only , kept all users and test database without any change, but the production and any other cluster requirements change.

 
6) Login to mysql cli as shown below.

 


7) when enabling oozie HA you need to have any other database as a persistence store for Oozie-server other than default derby.  derby can only serve single connection at a time.

so when running ambari as a monitoring tool for the cluster the Ambari needs to connect to mysql instance too. Execute the below command on Ambari -server host to make the jdbc connection available and so that testing the database connection can be done

8) To enable oozie-ha deploy two  Oozie-server on two  different host and go to configs tab and change the properties of the following configurations to both reflect the same

a)oozie-server1 installed on  hiveserver node
b)oozie-server2 installed on node2 node

go to configs tab and change the properties

c)oozie.zookeeper.connection.url = list of zookeeper nodes( values)
d)oozie,service.ext  =org.apache.oozie.service.ZKLocksService,org.apache.oozie.service.ZKXLogStreamingService,
org.apache.oozie.service.ZKJobsConcurrencyService
e)oozie.base.url= http://haproxy.node2:11000/oozie
and export the base url
f) EXPORT OOZIE_BASE_URL ="http://haproxy.node2.11000/oozie
g)oozie.zookeeper.namespace = oozie


In the  haproxy.cfg  file we need to change few properties as shown below

frontend http( the incoming request coming from )
  bind *:11000
  mode http

  acl      url_oozie     path_beg     "http://haproxy.node2.11000/oozie"


  use_backend      oozie-backend      if     url_oozie

  default_backend      web-backend

-------------------
backend      oozie-backend

   balance    roundrobin
   mode     http
   server     url_oozie      oozie.hiveserver:11000 check

   server     url_oozie       oozie.node2:11000 check

----------------------------

Backend

A backend is a set of servers that receives forwarded requests. Backends are defined in the backend section of the HAProxy configuration. In its most basic form, a backend can be defined by:
  • which load balance algorithm to use
  • a list of servers and ports

---mode http specifies that layer 7 proxying will be used.

Frontend

A frontend defines how requests should be forwarded to backends. Frontends are defined in the frontend section of the HAProxy configuration. Their definitions are composed of the following components:


  • a set of IP addresses and a port (e.g. 10.1.1.7:80, *:443, etc.)
  • ACLs
  • use_backend rules, which define which backends to use depending on which ACL conditions are matched, and/or a default_backend rule that handles every other case
  • Layer 7 Load Balancing 

    Another, more complex way to load balance network traffic is to use layer 7 (application layer) load balancing. Using layer 7 allows the load balancer to forward requests to different backend servers based on the content of the user's request.