cyberjoac
June 16th, 2009, 08:44 AM
Hi everybody, it's been two days I've been trying to solve that problem, but without luck...
I wrote a simple p2p application which makes an addition in a distributed way (each peer contacts its neighbors and ask them for the number to add and then print the result after 5 seconds)
My problem is the following: When my client contacts all his neighbors in a sequential way (without threads), there is no problem, except that it is very unefficient and could block all the connections for a non responding one...
The logical solution out of this is to use pthreads and to connect to each neighbor using different threads.
But then appears a new problem:
The connection goes allright (apparently) but then when I'm trying to call recv(), it returns 0, meaning that the opposite end just closed the connection and that there is nothing to read from...
This even if I don't call the close() command at any point in the server...
That's it I post the code, if you can have a look it would really help me so thanks again...
The interesting methods are : setServerSocket() and retrieveNextNode().
#include "peer.h"
using namespace std;
peer* signalHandler;
// Constructor for lru.
peer::peer(char* fileName) {
_fileName = fileName;
_peerNumber = 0;
_sum = 0;
pthread_mutex_init(&_mutex, NULL);
// Creates a new thread that will listens for incoming connections.
pthread_create(&_serverThread, NULL, peer::setServer, (void*) this);
// We set the global variable signal handler so that the static function
// will know which object to use to handle the alarm signal.
signalHandler = this;
// Sets the alarm signal handler.
// Parses the given input file.
parseInputFile();
_iterator = _neighborList.begin();
}
// Sets up the server socket
void peer::setServerSocket() {
// Sets up the socket.
struct addrinfo hints;
struct addrinfo *res;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; // Sets my ip adddress.
getaddrinfo(NULL, "1030", &hints, &res);
// Make a socket.
int sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
// Bind the socket to the right port.
bind(sockfd, res->ai_addr, res->ai_addrlen);
// Listen.
listen(sockfd, 10);
while(1) {
struct sockaddr_storage their_addr;
socklen_t addr_size = sizeof their_addr;
int newSocket;
newSocket = accept(sockfd, (struct sockaddr*) &their_addr, &addr_size);
// MAYBE WE NEED TO CREATE ANOTHER THREAD HERE.
char tosend[50];
sprintf(tosend,"%g", _inputValue);
// Send it.
send(newSocket, tosend, strlen(tosend), 0);
// Close the sending socket.
// close(newSocket);
}
}
// Adds its own sum, and then contacts all the other neighbors to
// add sequentially their data.
void peer::computeSum() {
_sum += _inputValue;
_peerNumber++;
while (_iterator != _neighborList.end()) {
pthread_t thread;
// Creates a new thread that will launch the function retrieveNextNode.
pthread_create(&thread, NULL, peer::retrieve_func, (void*) this);
// retrieveNextNode();
// Next time we call retrieveNextNode, will be ready.
_iterator++;
}
}
// Connects to the next node and retrieves its data and try to add it, disconnects when done.
void peer::retrieveNextNode() {
string ipAddress = _iterator->getIp();
string port = _iterator->getPort();
// Sets up the socket.
struct addrinfo hints;
struct addrinfo *res;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
getaddrinfo("localhost", "1030", &hints, &res);
// getaddrinfo(ipAddress.c_str(), port.c_str(), &hints, &res);
// Make a socket.
int sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
// Connect.
if (connect(sockfd, res->ai_addr, res->ai_addrlen)!= 0) {
cerr << "Error on connection" << endl;
return;
}
// Read the value sent by the other host, and add it to the sum.
char msgReceived[50];
int len = strlen(msgReceived);
recv(sockfd, msgReceived, len, 0);
// Critical section, updates the sum and the number of peers.
pthread_mutex_lock(&_mutex);
_sum += atof(msgReceived);
_peerNumber++;
pthread_mutex_unlock(&_mutex);
// Close the connection to this neighbor.
close(sockfd);
}
void catch_alarm(int sig_num) {
signalHandler->printOutput();
}
peer::~peer() {
}
// Parses the input file and puts all the data in data structures.
void peer::parseInputFile() {
_inputFile.open(_fileName);
string inputValue = getNextLine();
_inputValue = atof(inputValue.c_str());
string threashold = getNextLine();
_threashold = atoi(threashold.c_str());
startTimer();
// THERE IS A PROBLEM WITH PARSING HERE
_port = getNextLine();
// string::iterator it = _port.end() - 1;
// _port.erase(it);
//cout << strlen(_port.c_str());
parseNeighborList();
}
// Starts the timer with the indicated threashold.
void peer::startTimer() {
alarm(_threashold);
}
void peer::parseNeighborList() {
while (!_inputFile.eof()) {
string neighbor = getNextLine();
if (!neighbor.empty()) {
size_t colonSeparator = neighbor.find(":");
string ipAddress = neighbor.substr(0, colonSeparator);
string port = neighbor.substr(colonSeparator+1, neighbor.length() - colonSeparator - 2);
peerAddress toAdd(ipAddress, port);
_neighborList.push_back(toAdd);
}
}
}
// Returns the next non empty line.
string peer::getNextLine() {
// We loop until we find a non empty line.
string trimmed;
do {
_inputFile.getline(_buffer, 256);
trimmed = trimLine(_buffer);
} while(trimmed.length() == 1 && !_inputFile.eof());
return trimmed;
}
string peer::trimLine(const char* toTrim) {
string trimmed = string(toTrim);
// Trims the white spaces.
remove(trimmed.begin(), trimmed.end(), ' ');
return trimmed;
}
void peer::printOutput() {
printf("%f\t%d\n", _sum, _peerNumber);
}
void peer::printDebug() {
cout << "Input value: " << _inputValue << endl;
cout << "Time threashold: " << _threashold << endl;
cout << "Port: " << _port << endl;
cout << "Neighbors list: " << endl;
list<peerAddress>::iterator it;
for (it = _neighborList.begin(); it != _neighborList.end(); it++) {
cout << "Ip: " << it->getIp() << ", port: " << it->getPort() << endl;
}
}
int main(int argc, char **argv)
{
// The text file name is given as an input.
if(argc != 2) {
fprintf(stderr,"usage: peer <filename>\n");
exit(1);
}
signal(SIGALRM, catch_alarm);
peer myPeer(argv[1]);
myPeer.printDebug();
myPeer.computeSum();
pthread_exit(NULL);
return 0;
}
#ifndef PEER_H
#define PEER_H
#include <list>
#include <iostream>
#include <fstream>
#include <cstdlib>
#include <cstring>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
#include <csignal>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <netdb.h>
#include <cassert>
#include <signal.h>
#include <netinet/in.h>
#include "peerAddress.h"
class peer {
static void* retrieve_func(void* d) {
((peer*) d)->retrieveNextNode();
return NULL;
}
static void* setServer(void* d) {
((peer*) d)->setServerSocket();
return NULL;
}
private:
char* _fileName;
char _buffer[256];
std::ifstream _inputFile;
double _inputValue;
int _threashold;
std::string _port;
int _peerNumber;
double _sum;
pthread_mutex_t _mutex;
std::list<peerAddress> _neighborList;
std::list<peerAddress>::iterator _iterator;
pthread_t _serverThread;
// Server socket --> listener (will need to create other when incoming connections.
int s;
public:
peer(char* fileName);
~peer();
void parseInputFile();
void parseNeighborList();
std::string trimLine(const char* toTrim);
std::string getNextLine();
void printOutput();
void startTimer();
void printDebug();
void computeSum();
void retrieveNextNode();
void setServerSocket();
};
#endif
Merci encore!!
I wrote a simple p2p application which makes an addition in a distributed way (each peer contacts its neighbors and ask them for the number to add and then print the result after 5 seconds)
My problem is the following: When my client contacts all his neighbors in a sequential way (without threads), there is no problem, except that it is very unefficient and could block all the connections for a non responding one...
The logical solution out of this is to use pthreads and to connect to each neighbor using different threads.
But then appears a new problem:
The connection goes allright (apparently) but then when I'm trying to call recv(), it returns 0, meaning that the opposite end just closed the connection and that there is nothing to read from...
This even if I don't call the close() command at any point in the server...
That's it I post the code, if you can have a look it would really help me so thanks again...
The interesting methods are : setServerSocket() and retrieveNextNode().
#include "peer.h"
using namespace std;
peer* signalHandler;
// Constructor for lru.
peer::peer(char* fileName) {
_fileName = fileName;
_peerNumber = 0;
_sum = 0;
pthread_mutex_init(&_mutex, NULL);
// Creates a new thread that will listens for incoming connections.
pthread_create(&_serverThread, NULL, peer::setServer, (void*) this);
// We set the global variable signal handler so that the static function
// will know which object to use to handle the alarm signal.
signalHandler = this;
// Sets the alarm signal handler.
// Parses the given input file.
parseInputFile();
_iterator = _neighborList.begin();
}
// Sets up the server socket
void peer::setServerSocket() {
// Sets up the socket.
struct addrinfo hints;
struct addrinfo *res;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; // Sets my ip adddress.
getaddrinfo(NULL, "1030", &hints, &res);
// Make a socket.
int sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
// Bind the socket to the right port.
bind(sockfd, res->ai_addr, res->ai_addrlen);
// Listen.
listen(sockfd, 10);
while(1) {
struct sockaddr_storage their_addr;
socklen_t addr_size = sizeof their_addr;
int newSocket;
newSocket = accept(sockfd, (struct sockaddr*) &their_addr, &addr_size);
// MAYBE WE NEED TO CREATE ANOTHER THREAD HERE.
char tosend[50];
sprintf(tosend,"%g", _inputValue);
// Send it.
send(newSocket, tosend, strlen(tosend), 0);
// Close the sending socket.
// close(newSocket);
}
}
// Adds its own sum, and then contacts all the other neighbors to
// add sequentially their data.
void peer::computeSum() {
_sum += _inputValue;
_peerNumber++;
while (_iterator != _neighborList.end()) {
pthread_t thread;
// Creates a new thread that will launch the function retrieveNextNode.
pthread_create(&thread, NULL, peer::retrieve_func, (void*) this);
// retrieveNextNode();
// Next time we call retrieveNextNode, will be ready.
_iterator++;
}
}
// Connects to the next node and retrieves its data and try to add it, disconnects when done.
void peer::retrieveNextNode() {
string ipAddress = _iterator->getIp();
string port = _iterator->getPort();
// Sets up the socket.
struct addrinfo hints;
struct addrinfo *res;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
getaddrinfo("localhost", "1030", &hints, &res);
// getaddrinfo(ipAddress.c_str(), port.c_str(), &hints, &res);
// Make a socket.
int sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
// Connect.
if (connect(sockfd, res->ai_addr, res->ai_addrlen)!= 0) {
cerr << "Error on connection" << endl;
return;
}
// Read the value sent by the other host, and add it to the sum.
char msgReceived[50];
int len = strlen(msgReceived);
recv(sockfd, msgReceived, len, 0);
// Critical section, updates the sum and the number of peers.
pthread_mutex_lock(&_mutex);
_sum += atof(msgReceived);
_peerNumber++;
pthread_mutex_unlock(&_mutex);
// Close the connection to this neighbor.
close(sockfd);
}
void catch_alarm(int sig_num) {
signalHandler->printOutput();
}
peer::~peer() {
}
// Parses the input file and puts all the data in data structures.
void peer::parseInputFile() {
_inputFile.open(_fileName);
string inputValue = getNextLine();
_inputValue = atof(inputValue.c_str());
string threashold = getNextLine();
_threashold = atoi(threashold.c_str());
startTimer();
// THERE IS A PROBLEM WITH PARSING HERE
_port = getNextLine();
// string::iterator it = _port.end() - 1;
// _port.erase(it);
//cout << strlen(_port.c_str());
parseNeighborList();
}
// Starts the timer with the indicated threashold.
void peer::startTimer() {
alarm(_threashold);
}
void peer::parseNeighborList() {
while (!_inputFile.eof()) {
string neighbor = getNextLine();
if (!neighbor.empty()) {
size_t colonSeparator = neighbor.find(":");
string ipAddress = neighbor.substr(0, colonSeparator);
string port = neighbor.substr(colonSeparator+1, neighbor.length() - colonSeparator - 2);
peerAddress toAdd(ipAddress, port);
_neighborList.push_back(toAdd);
}
}
}
// Returns the next non empty line.
string peer::getNextLine() {
// We loop until we find a non empty line.
string trimmed;
do {
_inputFile.getline(_buffer, 256);
trimmed = trimLine(_buffer);
} while(trimmed.length() == 1 && !_inputFile.eof());
return trimmed;
}
string peer::trimLine(const char* toTrim) {
string trimmed = string(toTrim);
// Trims the white spaces.
remove(trimmed.begin(), trimmed.end(), ' ');
return trimmed;
}
void peer::printOutput() {
printf("%f\t%d\n", _sum, _peerNumber);
}
void peer::printDebug() {
cout << "Input value: " << _inputValue << endl;
cout << "Time threashold: " << _threashold << endl;
cout << "Port: " << _port << endl;
cout << "Neighbors list: " << endl;
list<peerAddress>::iterator it;
for (it = _neighborList.begin(); it != _neighborList.end(); it++) {
cout << "Ip: " << it->getIp() << ", port: " << it->getPort() << endl;
}
}
int main(int argc, char **argv)
{
// The text file name is given as an input.
if(argc != 2) {
fprintf(stderr,"usage: peer <filename>\n");
exit(1);
}
signal(SIGALRM, catch_alarm);
peer myPeer(argv[1]);
myPeer.printDebug();
myPeer.computeSum();
pthread_exit(NULL);
return 0;
}
#ifndef PEER_H
#define PEER_H
#include <list>
#include <iostream>
#include <fstream>
#include <cstdlib>
#include <cstring>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
#include <csignal>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <netdb.h>
#include <cassert>
#include <signal.h>
#include <netinet/in.h>
#include "peerAddress.h"
class peer {
static void* retrieve_func(void* d) {
((peer*) d)->retrieveNextNode();
return NULL;
}
static void* setServer(void* d) {
((peer*) d)->setServerSocket();
return NULL;
}
private:
char* _fileName;
char _buffer[256];
std::ifstream _inputFile;
double _inputValue;
int _threashold;
std::string _port;
int _peerNumber;
double _sum;
pthread_mutex_t _mutex;
std::list<peerAddress> _neighborList;
std::list<peerAddress>::iterator _iterator;
pthread_t _serverThread;
// Server socket --> listener (will need to create other when incoming connections.
int s;
public:
peer(char* fileName);
~peer();
void parseInputFile();
void parseNeighborList();
std::string trimLine(const char* toTrim);
std::string getNextLine();
void printOutput();
void startTimer();
void printDebug();
void computeSum();
void retrieveNextNode();
void setServerSocket();
};
#endif
Merci encore!!